# coding:utf-8 # !/usr/bin/env python3 # # Copyright (c) 2007-2009 Canonical Ltd. # # Author: Oliver Grawert # # Modifications 2013-2023 from papoteur # and Geiger David # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. # Requires python3-parted import sys import importlib try: from isodumper import version except: import version ########### # imports # ########### import yui import time import os import re import gettext from subprocess import Popen, PIPE from pydbus import SystemBus import gnupg import datetime import threading from gi.repository import GLib from queue import SimpleQueue import psutil import manatools.args as args import manatools.ui.common as common import manatools.ui.basedialog as basedialog class NoUDisks2(Exception): pass class UDisks2(object): BLOCK = "org.freedesktop.UDisks2.Block" DRIVE = "org.freedesktop.UDisks2.Drive" SERVICE = "org.freedesktop.UDisks2" def __init__(self): self.bus = SystemBus() try: self.iface = self.bus.get(self.SERVICE) except: raise NoUDisks2() self._udisks2_obj_manager = self.iface["org.freedesktop.DBus.ObjectManager"] def find_devices(self): objects = self._udisks2_obj_manager.GetManagedObjects() re_drive = re.compile("(?P.*?/drives/(?P.*))") re_block = re.compile("(?P.*?/block_devices/(?P.*))") devs = [ m.groupdict() for m in [re_drive.match(path) for path in (objects.keys())] if m ] blocks = [ m.groupdict() for m in [re_block.match(path) for path in (objects.keys())] if m ] list = [] for block in blocks: if not ( self.SERVICE + ".Partition" in objects[block["path"]] ): # we skip partition blocks for dev in devs: if dev["path"] == objects[block["path"]][self.BLOCK]["Drive"]: # we look for device which is referenced in Drive attribute of the current block dev_obj = objects[dev["path"]][self.DRIVE] if ( dev_obj["ConnectionBus"] == "usb" or dev_obj["ConnectionBus"] == "sdio" ) and ( dev_obj["Removable"] == 1 or dev_obj["MediaRemovable"] == 1 ): # we keep only blocks where device ConnectionBus is sdio or usb item = [] vend = dev_obj["Vendor"] if vend == "": name = dev_obj["Model"] else: name = vend + " " + dev_obj["Model"] path = "/dev/" + block["path"].split("/")[-1] size = dev_obj["Size"] item.append(name) item.append(path) item.append(size) list.append(item) return list def do_unmount(self, target): # keep only the end, like sdb target = os.path.basename(target) objects = self._udisks2_obj_manager.GetManagedObjects() re_block = re.compile("(?P.*?/block_devices/(?P.*))") blocks = [ m.groupdict() for m in [re_block.match(path) for path in (objects.keys())] if m ] for block in blocks: if ( self.SERVICE + ".Partition" in objects[block["path"]] ): # we skip non partition blocks # Table should be something like '/org/freedesktop/UDisks2/block_devices/sdb' if ( objects[block["path"]][self.SERVICE + ".Partition"]["Table"].split( "/" )[-1] == target ): if ( self.SERVICE + ".Encrypted" in objects[block["path"]] ): # Is the partition encrypted ? path_to_encrypted = objects[block["path"]][ self.SERVICE + ".Encrypted" ]["CleartextDevice"] if path_to_encrypted != "/": iface = self.bus.get(self.SERVICE, path_to_encrypted) fs = iface[self.SERVICE + ".Filesystem"] if fs.MountPoints: # partition is mounted try: fs.Unmount({}) except GLib.GError as e: print(str(e)) return False, _( "A partition is busy. Try to free it before starting again." ) except: raise Exception(str(e)) iface = self.bus.get(self.SERVICE, block["path"]) fs = iface[self.SERVICE + ".Encrypted"] fs.Lock({}) else: iface = self.bus.get(self.SERVICE, block["path"]) if self.SERVICE + ".Filesystem" in objects[block["path"]]: fs = iface[self.SERVICE + ".Filesystem"] if fs.MountPoints: # partition is mounted try: fs.Unmount({}) except GLib.GError as e: print(str(e)) return False, _( "A partition is busy. Try to free it before starting again." ) except: raise Exception(str(e)) return True, "" def get_partitions(self, target): # keep only the end, like sdb target = os.path.basename(target) partitions = [] objects = self._udisks2_obj_manager.GetManagedObjects() re_block = re.compile("(?P.*?/block_devices/(?P.*))") blocks = [ m.groupdict() for m in [re_block.match(path) for path in (objects.keys())] if m ] for block in blocks: if ( self.SERVICE + ".Partition" in objects[block["path"]] ): # we skip non partition blocks # Table should be something like '/org/freedesktop/UDisks2/block_devices/sdb' if ( objects[block["path"]][self.SERVICE + ".Partition"]["Table"].split( "/" )[-1] == target ): partition = objects[block["path"]][self.SERVICE + ".Block"] partitions.append( { "device": bytes(partition["Device"]) .decode("UTF-8", "replace") .strip("\0"), "label": partition["IdLabel"], "type": partition["IdType"], } ) return partitions def eject(self, device): """device is expected like /dev/sda""" block = os.path.basename(device) iface = self.bus.get( self.SERVICE, "/org/freedesktop/UDisks2/block_devices/" + block ) drive = iface.Get(self.BLOCK, "Drive") i_drive = self.bus.get(self.SERVICE, drive) i_drive.Eject({}) class Info(object): def __init__(self, title, richtext, text): self.title = title self.richtext = richtext self.text = text class ParseCLI(args.AppArgs): def __init(self, command): super().__init__(command) class Logging(): def __init__(self, debug, logger): # Init logging in Magiback self.bus = SystemBus() self.logremote = self.bus.get("org.mageia.Magiback", "Logging") self.logger = logger self.debug_level = debug def debug(self, message): if self.debug_level: self.logremote.debug(message) def info(self, message): self.logger.appendLines(message + "\n") self.logremote.info(message) def warning(self, message): self.logger.appendLines(message + "\n") self.logremote.warning(message) def error(self, message): self.logger.appendLines(message + "\n") self.logremote.error(message) class IsoDumper(basedialog.BaseDialog): # Maximal length for partition label label_length = { "fat32": 11, "ext4": 16, "ntfs": 32, "exfat": 11 } def get_devices(self, selected=None): self.list = self.u.find_devices() while len(self.list) == 0: if self.nodevDialog(): self.list = self.u.find_devices() else: self._running = False return False self.devicelist.addItem("", False) if len(self.list) > 0: for name, path, size in self.list: if size != 0: label = self.device_label(name, path, size) sel = (label == selected) if selected else False self.devicelist.addItem(label, sel) return True def device_label(self, name, path, size): return f"{name} ({path.lstrip()}) {self.sizeof_fmt(size)}" def udev_wait(self, operation): wait = Popen(["udevadm", "settle", "--timeout=15"], stderr=PIPE) wait.communicate() working = True while working: time.sleep(0.5) wait.poll() rc = wait.returncode if not (rc is None): wait = None working = False if rc != 0: self.return_state = False self.return_message = _("Timeout reached when {}".format(operation)) return False return True def update_list(self): self.devicelist.deleteAllItems() self.get_devices() self.img_name = "" self.ima.setLabel(self.ChooseImage) self.backup_select.setLabel(self.ChooseImage) self.dialog.recalcLayout() def update_list_on_event(self): selitem = self.devicelist.selectedItem().label() self.devicelist.deleteAllItems() self.get_devices(selected=selitem) if self.devicelist.selectedItem().label() != selitem: self.restore() def refresh_signal(self, device, params): self.devicelist.deleteAllItems() self.get_devices() self.dialog.recalcLayout() def device_selected(self): selitem = self.devicelist.selectedItem() if selitem is not None: self.dev = selitem.label() self.logger.debug(f"Selected {self.dev}") if self.dev != "": for name, path, size in self.list: if self.dev == self.device_label(name, path, size): self.deviceSize = size self.device_name = name.rstrip().replace(" ", "") message = _("Target Device: {}").format(self.dev) self.logger.info(message) partitions = self.u.get_partitions( self.dev.split("(")[1].split(")")[0] ) # I18N: verb in singular 3rd person self.logger.info(_("Contains this/these partition(s)")) # I18N: don't translate keywords inside braces {} for partition in partitions: self.logger.info( _("{device}: Type={type}, Label={label}").format( device=partition["device"], type=partition["type"], label=partition["label"], ) ) if len(partitions) == 0: # I18N: None refers to partition (no partitions on the selected device) self.logger.info(_("None")) self.partition_cbox.setEnabled() self.partition_cbox.setChecked(False) self.backup_cbox.setEnabled() self.backup_cbox.setChecked(False) self.write_cbox.setEnabled() self.write_cbox.setChecked(False) self.cryptcb.setChecked(False) break else: self.partition_cbox.setDisabled() self.partition_cbox.setChecked(False) self.backup_cbox.setDisabled() self.backup_cbox.setChecked(False) self.write_cbox.setDisabled() self.write_cbox.setChecked(False) self.cryptcb.setChecked(False) @staticmethod def sizeof_fmt(num): # I18N these are units for files size for unit in [_("B"), _("KiB"), _("MiB"), _("GiB"), _("TiB")]: if abs(num) < 1024.0: return "%3.3f %s" % (num, unit) num /= 1024.0 return "%.1f %s" % (num, _("TiB")) def backup_choosed(self): # Add .img if not specified self.backup_img_name = yui.YUI.app().askForSaveFileName( "", "*.img", _("Backup to:") ) if self.backup_img_name != "": if not self.backup_img_name.lower().endswith(".img"): self.backup_img_name = self.backup_img_name + ".img" head, tail = os.path.split(self.backup_img_name) self.backup_dest = self.backup_img_name self.backup_select.setLabel(tail) self.dialog.recalcLayout() self.interface_active_state() self.backup_is_selected = True else: self.backup_is_selected = False def get_sum(self, source): self.return_state = False self.signature_checked = False # Check if the sum file has a valid signature gpg = gnupg.GPG() gpg.encoding = "utf-8" # Use Mageia public key mageia_keyid = "835E41F4EDCA7A90" self.sum_type = "sha3" sig_file = "{}.{}.gpg".format(source, self.sum_type) self.source_file = "{}.{}".format(source, self.sum_type) try: keys_list = gpg.list_keys() except Exception as e: self.signature_found = False self.logger.warning(str(e)) self.logger.warning(_("GPG signatures database can not be read")) return key_present = False for entry in keys_list: if mageia_keyid == entry["keyid"]: if entry["expires"] and ( datetime.datetime.now().timestamp() > float(entry["expires"]) ): self.logger.warning("Mageia key expired, reloading") else: self.logger.debug("Mageia key already present") key_present = True break try: if not key_present: gpg.recv_keys("keyserver.ubuntu.com", mageia_keyid) self.sum_check_searched = True with open(sig_file, "rb") as g: self.signature_found = True verified = gpg.verify_file(g, close_file=False) if verified: self.signature_checked = True self.logger.debug("signature checked") g.close() else: g.seek(0) verified = gpg.verify_file(g, self.source_file) if verified: self.signature_checked = True self.logger.debug("Detached signature is OK") else: self.signature_checked = False self.logger.warning("Signature is false") except Exception as e: self.signature_found = False self.logger.warning(str(e)) self.logger.warning( _("Signature file {} not found\n" + _("or key expired")).format( sig_file ) ) def do_format(self): # code, format_type, name = self.ask_format() if self.partition_cb.value() != "": self.operation = True format_type = list(self.format_type.keys())[ list(self.format_type.values()).index(self.partition_cb.value()) ] if format_type == "persist": self.emergency( _("Persistence partition is to use when writing a Live ISO image.") ) return target = self.dev.split("(")[1].split(")")[0] info = Info( _("Formatting confirmation in {}".format(self.partition_cb.value())), True, self.warning, ) if self.ask_YesOrNo(info): name = self.partition_label.value() if format_type == "fat32" or format_type == "exfat": name = name.upper()[:11] elif format_type == "ntfs": name = name[:32] rc = self.raw_format(target, format_type, name) self.operation = False if rc == 0: return True, _("The device was formatted successfully.") elif rc == 5: return False, _("An error occurred while creating a partition.") elif rc == 127: return False, _("Authentication error.") else: return False, _("An error {} occurred.".format(rc)) def initial_state(self): self.update_list() self.image_is_selected = False self.backup_is_selected = False self.backup_select.setDisabled() self.backup_cbox.setDisabled() self.backup_dest = "" self.backup_select.setLabel(self.ChooseImage) self.partition_cbox.setDisabled() self.ima.setDisabled() self.img_name = "" self.ima.setLabel(self.ChooseImage) self.write_cbox.setDisabled() self.devicelist.setEnabled() self.progress.setLabel(_("Progress")) self.progress.setValue(0) self.progress.setDisabled() self.refreshbt.setEnabled() self.partition_cb.setDisabled() self.partition_label.setDisabled() self.partition_label.setValue("") self.cryptcb.setDisabled() self.cryptkey.setDisabled() self.cryptkey.setValue("") self.start_bt.setDisabled() def raw_format(self, usb_path, fstype, label): self.u.do_unmount(usb_path) self.udev_wait(_("unmounting")) self.operation = True if os.geteuid() > 0: launcher = "pkexec" self.process = Popen( [ launcher, "/usr/bin/python3", "-u", "/usr/lib/isodumper/raw_format.py", "-d", usb_path, "-f", fstype, "-l", label, "-u", str(os.geteuid()), "-g", str(os.getgid()), ], shell=False, stdout=PIPE, preexec_fn=os.setsid, ) else: self.process = Popen( [ "/usr/bin/python3", "-u", "/usr/lib/isodumper/raw_format.py", "-d", usb_path, "-f", fstype, "-l", label, "-u", str(os.geteuid()), "-g", str(os.getgid()), ], shell=False, stdout=PIPE, preexec_fn=os.setsid, ) working = True while working: time.sleep(0.5) self.process.poll() rc = self.process.returncode if rc is None: working = True else: self.process = None working = False return rc def backup_go(self): self.init_iface() self.wip_unsensitive() self.progress.setEnabled() self.progress.setLabel(_("Backup")) dest = self.backup_img_name if os.path.exists(dest): info = Info( _("Backup confirmation"), True, _("Do you want to overwrite the file?") ) if not (self.ask_YesOrNo(info)): self.returncode = 1 return False st = os.statvfs(os.path.dirname(dest)) free = st.f_bavail * st.f_frsize if free < self.deviceSize: sizeM = str(self.deviceSize / (1024 * 1024)) message = _( "The destination directory is too small to receive the backup (%s Mb needed)" ) % (sizeM) self.logger.warning(message) self.emergency(message) self.initial_state() return False, message else: self.operation = True self.returncode = 0 source = self.dev.split("(")[1].split(")")[0] message = _("Backup to: {}").format(dest) self.logger.info(message) # Writing step # set flag and provide uid, gid of current user to set the owner of the backup file at end self.iface.set_backup_mode(os.getuid(), os.getgid()) self.iface.do_write(source, dest, self.deviceSize) self.iface.unset_backup_mode() while not self.iface.done: progress = self.iface.progress self.progress.setValue(progress) self.dialog.pollEvent() time.sleep(0.2) success, message = self.iface.end() if not success: self.emergency(message) self.initial_state() return False, message # I18N: don't translate source nor target return True, _("{source} successfully written to {target}").format( source=source.split("/")[-1], target=dest ) def do_write(self): self.init_iface() self.wip_unsensitive() self.progress.setEnabled() self.progress.setLabel("Writing") source = self.img_name target = self.dev.split("(")[1].split(")")[0] b = os.path.getsize(source) if b > (self.deviceSize): message = _("The device is too small to contain the ISO file.") self.logger.warning(message) self.emergency(message) else: # I18N: don't translate source nor target self.progress.setLabel( _("Writing {source} to {target}").format( source=source.split("/")[-1], target=target.split("/")[-1] ) ) message = _("Executing copy from {source} to {target}").format( source=source, target=target ) self.operation = True self.logger.info(message) success, message = self.u.do_unmount(target) self.udev_wait(_("unmounting")) if success: # Writing step self.iface.do_write(source, target, b) progress = self.iface.progress while not self.iface.done: progress = self.iface.progress self.progress.setValue(progress) self.dialog.pollEvent() time.sleep(0.5) success, message = self.iface.end() if success: # I18N: don't translate source nor target message = _("Image {source} written to {target}").format( source=source.split("/")[-1], target=target ) message += "\n" + _("Bytes written: {}").format(str(b)) self.logger.info(message) self.progress.setLabel(_("Checking ") + target.split("/")[-1]) self.progress.setValue(0) self.dialog.pollEvent() # Checking self.iface.check_write(target, source) progress = 0 while progress < 100: self.progress.setValue(progress) self.dialog.pollEvent() time.sleep(0.5) progress = self.iface.progress nowarning, message = self.iface.end() self.progress.setEnabled() # Add persistent partition if asked, persistent or not if (self.partition_cbox.isChecked()): self.progress.setLabel(_("Adding partition")) self.progress.setValue(0) if self.cryptcb.isChecked() and (self.partition_cb.value() == "ext4"): if self.cryptkey.value() == "": message = _( "No key for encrypted partition provided. Adding the partition aborted." ) self.logger.warning(message) else: self.iface.do_persistence( target, self.partition_label.value(), self.cryptkey.value(), self.partition_cb.value() ) while not self.iface.done: progress = self.iface.progress self.progress.setValue(progress) self.progress.setLabel(self.iface.message) self.dialog.pollEvent() time.sleep(0.5) if self.iface.state: message = _( "Added encrypted partition" ) self.logger.info(message) else: self.logger.error( _( "Adding encrypted partition failed" ) ) self.logger.error(self.iface.message) nowarning = False else: self.iface.do_persistence( target, self.partition_label.value(), "", self.partition_cb.value() ) while not self.iface.done: progress = self.iface.progress self.progress.setValue(progress) self.dialog.pollEvent() time.sleep(0.5) if self.iface.state: message = _("Added partition") else: message = _("Adding partition failed") self.logger.warning(self.iface.message) nowarning = False self.logger.warning(message) # Unmount if partitions are automatically mounted and then eject self.progress.setValue(100) self.dialog.pollEvent() success, message = self.u.do_unmount(target) self.udev_wait(_("unmounting")) if success: self.u.eject(target) if nowarning: self.success() else: self.warn() else: self.emergency(message) self.initial_state() else: self.emergency(message) self.initial_state() else: self.emergency(message) self.initial_state() self.initial_state() self.operation = False def success(self): self.operation = False self.final_unsensitive() info = Info( _("Success"), True, _("The operation completed successfully.") + "\n" + _( "You are free to unplug it now, a logfile \n\ /var/log/magiback.log has been saved." ), ) if self.ask_OK(info): return def warn(self): self.operation = False self.final_unsensitive() info = Info( _("Warning"), True, _( "The operation completed, but with anomalies.\n\ Check carefully the messages in log view" ) + "\n" + _( "You are free to unplug it now, a logfile \n\ /var/log/magiback.log has been saved." ), ) if self.ask_OK(info): return def confirm_close(self, *args): if not self.operation: # no writing , backup nor format running self.close() else: # writing , backup or format running info = Info( _("Warning"), True, _( "Writing is in progress. Exiting during writing \n\ will make the device or the backup unusable.\n\ Are you sure you want to quit during writing?" ), ) if self.ask_YesOrNo(info): return True else: return False def emergency(self, message): self.operation = False self.returncode = 1 self.final_unsensitive() info = Info(_("Error"), True, message) self.ask_OK(info) def final_unsensitive(self): self.ima.setDisabled() self.devicelist.setDisabled() self.write_cbox.setDisabled() self.backup_cbox.setDisabled() self.progress.setDisabled() self.backup_select.setDisabled() self.partition_cbox.setDisabled() def wip_unsensitive(self): self.ima.setDisabled() self.devicelist.setDisabled() self.backup_cbox.setDisabled() self.backup_select.setDisabled() self.write_cbox.setDisabled() self.refreshbt.setDisabled() self.partition_cbox.setDisabled() self.partition_cb.setDisabled() self.partition_label.setDisabled() self.cryptcb.setDisabled() self.cryptkey.setDisabled() self.start_bt.setDisabled() def close(self): # to exit from _handleEvents loop self._running = False #def logger(self, text): #self.logview.appendLines(text + "\n") def activate_devicelist(self): self.devicelist.setEnabled() message = _("ISO Image to copy: ") + self.img_name self.logger.info(message) # self.chooser.set_tooltip_text(self.img_name) def help_dialog(self): info = Info( _("IsoDumper"), True, _("Mageia IsoDumper") + "
\ ----------------
" + _( "This GUI program is primarily for safely writing a bootable ISO image to a USB flash drive, \ an operation devious & potentially hazardous when done by hand. As a bonus, it can also back up the \ entire previous contents of the flash drive onto the hard disc, and restore the flash drive \ to its previous state subsequently." ) + "
" + _("It gives also a feature for formatting the USB device.") + "

" + _( "IsoDumper can be launched either from the menus, or a user console with the command 'isodumper'." ) + "
" + _( "The root password is solicited when this is necessary for the program's operation." ) + "
" + _( "The flash drive can be inserted beforehand or once the program is started. In the latter case, a \ dialogue will say that there is no flash drive inserted, and allow a 'retry' to find it once it is.
\ (You may have to close any automatically opened File Manager window)." ) + "

" + _( "The fields of the main window are as follows:
\ - Device to work on: the device of the USB flash drive, a drop-down list to choose from.
\ - Write Image: to choose the source ISO image *.iso (or flash drive backup file *.img) to write out.
\ - Write to device: This button launches the operation - with a prior warning dialogue." ) + "
" + _( "- Add a persistent partition: the remaining space will be used in a new partition where data from the Live system can be written and recovered between sessions." ) + "
" + _( "- Encrypt: the persistent partition will be encrypted with the key provided in Key field." ) + "
" + _("The operation is shown in the progress bar beneath.") + "
" + _( "- Backup to: define the name and placement of the backup image file. The current flash drive \ will be backed up to a disc file. Note that the entire flash drive is preserved, regardless of its \ actual contents; ensure that you have the necessary free disc space (the same size as the USB device). \ This backup file can be used later to restore the flash drive by selecting it as the source *.img file to write out." ) + "
" + _("- Backup the device: launch the backup operation.") + "
" + _( "- Format the device: create an unique partition on the entire volume in the specified format in FAT, \ exFAT, NTFS or ext. You can specify a volume name and the format in a new dialog box." ) + "
", ) if self.ask_OK(info): return def init_iface(self): # Prepare interface on DBus try: self.iface except: self.bus = SystemBus() self.iface = self.bus.get("org.mageia.Magiback", "Isodumper") def __init__(self, debug=False): APP = "isodumper" # Call translation catalog gettext.install(APP) # Check that there is no other instance running current_pid = psutil.Process().pid proc_iter = psutil.process_iter(attrs=["pid", "name"]) for p in proc_iter: if (p.info["name"] == APP) and (p.info["pid"] != current_pid): info = Info( _("Error"), True, _("There is another instance of Isodumper already running."), ) self.ask_OK(info) yui.YUILoader.deleteUI() self._running = False return # define size of the selected device self.deviceSize = 0 # Operation running self.operation = False self.ChooseImage = _("Choose an image") self.warning = _( "Warning\nThis will destroy all data on the target device,\n\ are you sure you want to proceed?\n\ If you say ok here, please do not unplug\ the device during the following operation." ) """ Init/Constructor for the 'widgets' """ self.dialog = basedialog.BaseDialog.__init__( self, _("Isodumper {}").format(version.RELEASE), "", basedialog.DialogType.POPUP, 100, 10, ) yui.YUI.app().setApplicationIcon("/usr/share/icons/isodumper.png") def UIlayout(self, layout): # create the main gui # +---------------------+ # + banner + # +---------------------+ # | devicebox | # +---+-----+------+----+ # + L | ima | writebt | # +---+-----+------+----+ # + L | backup_select | backupbt | # +---+-----+------+----+ # + F | formatbt | # +----------------+----+ # | progress | # +---------------------+ # | report | # +---------------------+ # | refreshbt | aboutbt | helpbt | quitbt | # +---------------------+ # self.ancrage = self.factory.createReplacePoint(self.dialog) self.box = self.factory.createVBox(layout) self.bannerbox = self.factory.createHBox(self.box) self.banner = self.factory.createImage( self.bannerbox, "/usr/share/isodumper/header.svg" ) self.devicebox = self.factory.createHBox(self.box) self.devicelist = self.factory.createComboBox( self.devicebox, _("Select the device to work on:"), False ) self.devicelist.setNotify() self.devicelist.setStretchable(0, True) self.refreshbt = self.factory.createPushButton(self.devicebox, _("Update list")) self.refreshbt.setStretchable(0, True) self.operation_frame = self.factory.createFrame( self.box, _("Select operations") ) self.operationbox = self.factory.createVBox(self.operation_frame) self.labelbox = self.factory.createHBox(self.operationbox) self.labelbox.setStretchable(0, True) self.factory.createLabel( self.labelbox, "\n".join( [ _("The selected operations will be executed in order from top to bottom."), _("If both write image and create partition are selected, the partition will be created in the free space after the image."), ]), ).setStretchable(0, True) self.backupbox = self.factory.createHBox(self.operationbox) self.backup_cbox = self.factory.createCheckBox( self.backupbox, _("Backup the device to:") ) self.backup_cbox.setNotify() # add file picker for backup file name self.backup_select = self.factory.createPushButton( self.backupbox, self.ChooseImage ) self.backup_select.setStretchable(0, True) self.factory.createHStretch(self.backupbox) self.writebox = self.factory.createHBox(self.operationbox) self.write_cbox = self.factory.createCheckBox( self.writebox, _("Write Image from:") ) self.write_cbox.setNotify() # add file picker for image file self.ima = self.factory.createPushButton(self.writebox, self.ChooseImage) self.ima.setStretchable(0, True) self.factory.createHStretch(self.writebox) # self.writebt = self.factory.createPushButton(self.writebox, _("&Write to device")) self.partitionbox = self.factory.createHBox(self.operationbox) self.partition_cbox = self.factory.createCheckBox( self.partitionbox, _("Create partition of type:") ) self.partition_cbox.setNotify() self.partition_cb = self.factory.createComboBox(self.partitionbox, _("Type:")) self.partition_cb.addItem("", True) self.format_type = { "fat32": _("FAT32"), "ext4": _("ext4"), "ntfs": _("NTFS"), "exfat": _("exFAT"), "persist": _("Persistent partition"), } for key in self.format_type: self.partition_cb.addItem(self.format_type[key], False) self.partition_cb.setNotify() self.partition_label = self.factory.createInputField( self.partitionbox, _("Label:") ) self.partition_label.setStretchable(0, True) self.persistencebox = self.factory.createHBox(self.operationbox) self.cryptcb = self.factory.createCheckBox( self.persistencebox, _("Encrypt partition using LUKS, with key:") ) self.cryptcb.setNotify() self.cryptkey = self.factory.createInputField( self.persistencebox, _("Key:"), passwordMode=True ) self.cryptkey.setStretchable(0, True) self.execute_frame = self.factory.createFrame(self.box, _("Execution")) self.execute_framevb = self.factory.createVBox(self.execute_frame) self.execute_label = self.factory.createLabel( self.execute_framevb, _("When you are sure all options are correct, start:") ) self.start_bt = self.factory.createPushButton( self.execute_framevb, _("Execute") ) self.start_bt.setNotify() self.progressbox = self.factory.createHBox(self.execute_framevb) self.progress = self.factory.createProgressBar( self.progressbox, _("Progress"), 100 ) self.progress.setStretchable(0, True) self.reportbox = self.factory.createHBox(self.box) self.reportbox.setWeight(1, 30) self.logview = self.factory.createLogView(self.reportbox, _("Report"), 10) self.logview.setStretchable(0, True) self.buttonsbox = self.factory.createHBox(self.box) self.aboutbt = self.factory.createPushButton(self.buttonsbox, _("About")) self.aboutbt.setStretchable(0, True) self.helpbt = self.factory.createPushButton(self.buttonsbox, _("Help")) self.helpbt.setStretchable(0, True) self.quitbt = self.factory.createPushButton(self.buttonsbox, _("Quit")) self.quitbt.setStretchable(0, True) # Connect events self.eventManager.addWidgetEvent(self.quitbt, self.confirm_close) self.eventManager.addWidgetEvent(self.ima, self.ask_image) self.eventManager.addWidgetEvent(self.cryptcb, self.check_encryt) self.eventManager.addWidgetEvent(self.backup_select, self.backup_choosed) self.eventManager.addWidgetEvent(self.backup_cbox, self.select_backup) self.eventManager.addWidgetEvent(self.write_cbox, self.select_write) self.eventManager.addWidgetEvent(self.partition_cbox, self.select_partition) self.eventManager.addWidgetEvent(self.partition_cb, self.select_partition_type) self.eventManager.addWidgetEvent(self.devicelist, self.device_selected) self.eventManager.addWidgetEvent(self.start_bt, self.start) self.eventManager.addWidgetEvent(self.refreshbt, self.update_list) self.eventManager.addWidgetEvent(self.helpbt, self.help_dialog) self.eventManager.addWidgetEvent(self.aboutbt, self.aboutDialog) # Set the initial state, active, disabled # self.initial_state() self.dialog.recalcLayout() self.uEventQueue = SimpleQueue() if yui.YUI.app().isTextMode(): self.glib_loop = GLib.MainLoop() self.glib_thread = threading.Thread( target=self.glib_mainloop, args=(self.glib_loop,) ) self.glib_thread.start() self.u.iface["org.freedesktop.DBus.ObjectManager"].InterfacesAdded.connect( self.device_changed ) self.u.iface["org.freedesktop.DBus.ObjectManager"].InterfacesRemoved.connect( self.device_changed ) def glib_mainloop(self, loop): """ thread function for glib main loop """ loop.run() def device_changed(self, a, b): if not self.operation: if "drives" in a.split("/"): self.uEventQueue.put({"event": "device-changed", "value": True}) def ask_image(self): self.img_name = yui.YUI.app().askForExistingFile( "", "*.iso *.img", self.ChooseImage ) if self.img_name != "": # Check if the image has a signed checksum self.get_sum(self.img_name) if self.signature_found: if not self.signature_checked: info = Info( _("Warning"), True, _( "The validation of the GPG signature of the checksum file failed !" ) + "\n" + _("Do you want to continue?"), ) if self.ask_YesOrNo(info): pass else: self.img_name = "" return else: self.logger.info(_("The checksum file is signed")) else: info = Info( _("Warning"), True, _( "No GPG signature has been found or the key is expired. Are you sure you want to use this image?" ), ) if self.ask_YesOrNo(info): pass else: self.img_name = "" return self.ima.setLabel(os.path.basename(self.img_name)) self.dialog.recalcLayout() self.activate_devicelist() self.image_is_selected = True else: self.image_is_selected = False self.interface_active_state() def interface_active_state(self): # Determine the state of the Execute button and others, if they can be active or not if self.devicelist.selectedItem() is not None: self.partition_cbox.setEnabled() self.backup_cbox.setEnabled() self.write_cbox.setEnabled() else: self.partition_cbox.setDisabled() self.partition_cbox.setChecked(False) self.backup_cbox.setDisabled() self.backup_cbox.setChecked(False) self.write_cbox.setDisabled() self.write_cbox.setChecked(False) self.cryptcb.setChecked(False) backup_ready = self.backup_cbox.isChecked() and (self.backup_dest != "") write_ready = self.write_cbox.isChecked() and (self.img_name != "") partition_ready = self.partition_cbox.isChecked() and ( self.partition_cb.value() in self.format_type.values() ) crypt_ready = self.cryptcb.isChecked() and (self.cryptkey.value() != "") start_active = backup_ready or write_ready or partition_ready if start_active: self.start_bt.setEnabled() else: self.start_bt.setDisabled() if partition_ready: self.cryptcb.setEnabled() else: self.cryptcb.setDisabled() self.cryptkey.setDisabled() def select_backup(self): if self.backup_cbox.isChecked(): self.backup_select.setEnabled() else: self.backup_select.setDisabled() self.interface_active_state() def select_write(self): if self.write_cbox.isChecked(): self.ima.setEnabled() else: self.ima.setDisabled() self.interface_active_state() def select_partition(self): if self.partition_cbox.isChecked(): self.partition_cb.setEnabled() self.partition_label.setEnabled() else: self.partition_cb.setDisabled() self.partition_label.setDisabled() self.interface_active_state() def select_partition_type(self): print(f"Selected {self.partition_cb.value()}") if self.partition_cb.value() in (_("ext4"), _("Persistent partition")): self.cryptcb.setEnabled() if self.partition_cb.value() == _("Persistent partition"): self.partition_label.setValue("mgalive-persist") self.partition_cb.setValue("ext4") else: self.cryptcb.setDisabled() self.cryptcb.setDisabled() self.cryptkey.setDisabled() self.interface_active_state() def check_encryt(self): if self.cryptcb.isChecked(): self.cryptkey.setEnabled() else: self.cryptkey.setDisabled() self.interface_active_state() def start(self): backup_success = False if self.backup_cbox.isChecked() and not self.backup_is_selected: # check that a name is set info = Info(_("Error"), True, _("No image for backup is selected.")) self.ask_OK(info) return if self.write_cbox.isChecked(): # check that a name is set if self.image_is_selected: info = Info(_("Writing confirmation"), True, self.warning) if self.ask_YesOrNo(info): if self.deviceSize > 1024 * 1024 * 1024 * 32: info = Info( _("Warning"), True, _( "The device is bigger than 32 Gbytes. Are you sure you want use it?" ), ) if not self.ask_YesOrNo(info): return else: info = Info(_("Error"), True, _("No image to write is selected.")) self.ask_OK(info) return if self.partition_cbox.isChecked(): format_type = list(self.format_type.keys())[ list(self.format_type.values()).index(self.partition_cb.value()) ] if len(self.partition_label.value()) > self.label_length[format_type]: info = Info( _("Warning"), True, _( "Label length will be shorten to {} characters. Do you want to continue?".format(self.label_length[format_type]) ), ) if not self.ask_YesOrNo(info): return if self.backup_cbox.isChecked() and self.backup_is_selected: backup_success, message = self.backup_go() if not backup_success: # something went wrong, we stop self.initial_state() return self.logger.info(message) if self.write_cbox.isChecked() and self.image_is_selected: self.do_write() self.image_is_selected = False return if self.partition_cbox.isChecked(): # Create a partition without writing image, will use all the device space. format_success, message = self.do_format() if format_success: self.logger.info(message) self.success() else: self.logger.warning(message) self.emergency(message) elif backup_success: # success message when no other operation after backup self.progress.setEnabled() self.progress.setValue(100) self.dialog.pollEvent() self.success() self.initial_state() def ask_format(self): atelier = yui.YUI.widgetFactory() dialog = atelier.createPopupDialog() vb = atelier.createVBox(dialog) label = atelier.createInputField(vb, _("Label for the device:")) cr = atelier.createRadioButtonGroup(vb) vb_c = atelier.createVBox(cr) vb_c1 = atelier.createHBox(vb_c) format_fat = atelier.createRadioButton( atelier.createLeft(vb_c1), _("FAT32 (Windows)") ) vb_c4 = atelier.createHBox(vb_c) format_exfat = atelier.createRadioButton( atelier.createLeft(vb_c4), _("exFAT (Windows)") ) vb_c2 = atelier.createHBox(vb_c) format_ntfs = atelier.createRadioButton( atelier.createLeft(vb_c2), _("NTFS (Windows)") ) vb_c3 = atelier.createHBox(vb_c) format_ext = atelier.createRadioButton( atelier.createLeft(vb_c3), _("ext4 (Linux)") ) bb = atelier.createHBox(vb) executebt = atelier.createPushButton(bb, _("Execute")) cancelbt = atelier.createPushButton(bb, _("Cancel")) dialog.open() returncode = True while True: event = dialog.waitForEvent() if event.eventType() == yui.YEvent.CancelEvent: returncode = False break if event.widget() == executebt: if format_fat.value(): format_type = "fat32" format_label = label.value().upper()[:11] break if format_exfat.value(): format_type = "exfat" format_label = label.value()[:32] break if format_ntfs.value(): format_type = "ntfs" format_label = label.value()[:32] break if format_ext.value(): format_type = "ext4" format_label = label.value() break if event.widget() == cancelbt: returncode = False break dialog.destroy() if returncode: return True, format_type, format_label else: return False, " ", " " def ask_OK(self, info): yui.YUI.widgetFactory mgafactory = yui.YMGAWidgetFactory.getYMGAWidgetFactory( yui.YExternalWidgets.externalWidgetFactory("mga") ) dlg = mgafactory.createDialogBox(yui.YMGAMessageBox.B_ONE) dlg.setTitle(info.title) dlg.setText(info.text, info.richtext) dlg.setButtonLabel(_("OK"), yui.YMGAMessageBox.B_ONE) dlg.setMinSize(60, 10) return dlg.show() == yui.YMGAMessageBox.B_ONE def ask_YesOrNo(self, info): yui.YUI.widgetFactory mgafactory = yui.YMGAWidgetFactory.getYMGAWidgetFactory( yui.YExternalWidgets.externalWidgetFactory("mga") ) dlg = mgafactory.createDialogBox(yui.YMGAMessageBox.B_TWO) dlg.setTitle(info.title) dlg.setText(info.text, info.richtext) dlg.setButtonLabel(_("Yes"), yui.YMGAMessageBox.B_ONE) dlg.setButtonLabel(_("No"), yui.YMGAMessageBox.B_TWO) dlg.setMinSize(60, 8) return dlg.show() == yui.YMGAMessageBox.B_ONE def aboutDialog(self): yui.YUI.widgetFactory mgafactory = yui.YMGAWidgetFactory.getYMGAWidgetFactory( yui.YExternalWidgets.externalWidgetFactory("mga") ) dlg = mgafactory.createAboutDialog( "Isodumper", version.RELEASE, "GPLv2", _("Oliver Grawert
Papoteur
Pictures : Timothée Giet"), _("A tool for writing ISO images to a device") + "
http://gitweb.mageia.org/software/isodumper", "", ) dlg.show() def nodevDialog(self): yui.YUI.widgetFactory mgafactory = yui.YMGAWidgetFactory.getYMGAWidgetFactory( yui.YExternalWidgets.externalWidgetFactory("mga") ) dlg = mgafactory.createDialogBox(yui.YMGAMessageBox.B_TWO) dlg.setTitle("IsoDumper") dlg.setText( _( "Warning\nNo target devices were found.\nYou need to plug in a USB Key to which the image can be written." ) ) dlg.setButtonLabel(_("Refresh"), yui.YMGAMessageBox.B_ONE) dlg.setButtonLabel(_("Cancel"), yui.YMGAMessageBox.B_TWO) dlg.setMinSize(60, 8) return dlg.show() == yui.YMGAMessageBox.B_ONE def doSomethingIntoLoop(self): self.traitement = None # if event.eventType() == yui.YEvent.CancelEvent: # self.confirm_close() # break if self._start: self._start = False self.initial_state() try: item = self.uEventQueue.get_nowait() if item["event"] == "device-changed": self.update_list_on_event() except Exception as e: pass def run(self, debug=False): self.backupTitle = yui.YUI.app().applicationTitle() yui.YUI.app().setApplicationTitle(self._title) try: self.u = UDisks2() except: message = _("UDisks2 is not available on your system") self.logger.error(message) self.emergency(message) self._running = False return self._setupUI() # setting to False will break the event loop self.logger = Logging(debug, self.logview) self._running = True self._start = False self.timeout = 100 self._start = True try: self._handleEvents() except Exception: import traceback traceback.print_exc() yui.YDialog.deleteAllDialogs() # Closing # restore old application title if hasattr(self, 'iface'): self.iface.close() yui.YUI.app().setApplicationTitle(self.backupTitle) self.dialog.destroy() self.dialog = None if yui.YUI.app().isTextMode(): self.glib_loop.quit() self.glib_thread.join() if __name__ == "__main__": # Send the yui logging outside of the console log = yui.YUILog.setLogFileName("/dev/null") parser = ParseCLI("isodumper") parser.parser.add_argument( "-d", "--debug", help=_("allow debug information"), action="store_true" ) if parser.args.version: print(version.RELEASE) else: if parser.args.debug: app = IsoDumper(debug=True) else: app = IsoDumper() app.run() # next line seems to be a workaround to prevent the qt-app from crashing # see https://github.com/libyui/libyui-qt/issues/41 yui.YUILoader.deleteUI()