# coding:utf-8 # !/usr/bin/env python3 # # Copyright (c) 2007-2009 Canonical Ltd. # # Author: Oliver Grawert # # Modifications 2013 from papoteur # and Geiger David # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. # Requires python3-parted import sys import importlib try: from isodumper import version except: import version ########### # imports # ########### import yui import time import os import re import gettext from subprocess import Popen, PIPE from pydbus import SystemBus import gnupg import datetime import logging import threading from gi.repository import GLib from queue import SimpleQueue import psutil import manatools.args as args import manatools.ui.common as common import manatools.ui.basedialog as basedialog class NoUDisks2(Exception): pass class UDisks2(object): BLOCK = 'org.freedesktop.UDisks2.Block' DRIVE = 'org.freedesktop.UDisks2.Drive' SERVICE = 'org.freedesktop.UDisks2' def __init__(self): self.bus = SystemBus() try: self.iface = self.bus.get(self.SERVICE) except: raise NoUDisks2() self._udisks2_obj_manager = self.iface["org.freedesktop.DBus.ObjectManager"] def find_devices(self): objects = self._udisks2_obj_manager.GetManagedObjects() re_drive = re.compile("(?P.*?/drives/(?P.*))") re_block = re.compile("(?P.*?/block_devices/(?P.*))") devs = [m.groupdict() for m in [re_drive.match(path) for path in (objects.keys())] if m] blocks = [m.groupdict() for m in [re_block.match(path) for path in (objects.keys())] if m] list = [] for block in blocks: if not (self.SERVICE + '.Partition' in objects[block['path']]): # we skip partition blocks for dev in devs: if dev['path'] == objects[block['path']][self.BLOCK]['Drive']: # we look for device which is referenced in Drive attribute of the current block dev_obj = objects[dev['path']][self.DRIVE] if (dev_obj['ConnectionBus'] == 'usb' or dev_obj['ConnectionBus'] == 'sdio') and \ (dev_obj['Removable'] == 1 or dev_obj[ 'MediaRemovable'] == 1): # we keep only blocks where device ConnectionBus is sdio or usb item = [] vend = dev_obj['Vendor'] if vend == "": name = dev_obj['Model'] else: name = vend + " " + dev_obj['Model'] path = '/dev/' + block['path'].split('/')[-1] size = dev_obj['Size'] item.append(name) item.append(path) item.append(size) list.append(item) return list def do_unmount(self, target): # keep only the end, like sdb target = os.path.basename(target) objects = self._udisks2_obj_manager.GetManagedObjects() re_block = re.compile('(?P.*?/block_devices/(?P.*))') blocks = [m.groupdict() for m in [re_block.match(path) for path in (objects.keys())] if m] for block in blocks: if (self.SERVICE + '.Partition' in objects[block['path']]): # we skip non partition blocks # Table should be something like '/org/freedesktop/UDisks2/block_devices/sdb' if objects[block['path']][self.SERVICE + '.Partition']['Table'].split('/')[-1] == target: if (self.SERVICE + '.Encrypted' in objects[block['path']]): # Is the partition encrypted ? path_to_encrypted = objects[block['path']][self.SERVICE + '.Encrypted']['CleartextDevice'] if path_to_encrypted != '/': iface = self.bus.get(self.SERVICE, path_to_encrypted) fs = iface[self.SERVICE + '.Filesystem'] if fs.MountPoints: # partition is mounted try: fs.Unmount({}) except GLib.GError as e: print(str(e)) return False, _("A partition is busy. Try to free it before starting again.") except: raise Exception(str(e)) iface = self.bus.get(self.SERVICE, block['path']) fs = iface[self.SERVICE + '.Encrypted'] fs.Lock({}) logging.debug(f"Unmounting encryted {block['path']}") else: iface = self.bus.get(self.SERVICE, block['path']) if self.SERVICE + '.Filesystem' in objects[block['path']]: fs = iface[self.SERVICE + '.Filesystem'] if fs.MountPoints: # partition is mounted logging.debug(f"Unmounting {block['path']}") try: fs.Unmount({}) except GLib.GError as e: print(str(e)) return False, _("A partition is busy. Try to free it before starting again.") except: raise Exception(str(e)) else: logging.debug(f"Not mounted {block['path']}") return True, "" def get_partitions(self, target): # keep only the end, like sdb target = os.path.basename(target) partitions = [] objects = self._udisks2_obj_manager.GetManagedObjects() re_block = re.compile('(?P.*?/block_devices/(?P.*))') blocks = [m.groupdict() for m in [re_block.match(path) for path in (objects.keys())] if m] for block in blocks: if (self.SERVICE + '.Partition' in objects[block['path']]): # we skip non partition blocks # Table should be something like '/org/freedesktop/UDisks2/block_devices/sdb' if objects[block['path']][self.SERVICE + '.Partition']['Table'].split('/')[-1] == target: partition = objects[block['path']][self.SERVICE + '.Block'] partitions.append( {'device': bytes(partition['Device']).decode("UTF-8", "replace").strip("\0"), 'label': partition['IdLabel'], 'type': partition['IdType']}) return partitions def eject(self, device): ''' device is expected like /dev/sda''' block = os.path.basename(device) iface = self.bus.get(self.SERVICE, '/org/freedesktop/UDisks2/block_devices/' + block) drive = iface.Get(self.BLOCK, "Drive") i_drive = self.bus.get(self.SERVICE, drive) i_drive.Eject({}) class Info(object): def __init__(self, title, richtext, text): self.title = title self.richtext = richtext self.text = text class ParseCLI(args.AppArgs): def __init(self, command): super().__init__(command) class IsoDumper(basedialog.BaseDialog): def get_devices(self, selected=None): self.list = self.u.find_devices() while len(self.list) == 0: if self.nodevDialog(): self.list = self.u.find_devices() else: return False self.devicelist.addItem("", False) if len(self.list) > 0: for name, path, size in self.list: if size != 0: label = str(name + ' (' + path.lstrip() + ') ' + self.sizeof_fmt(size)) sel = (label == selected) if selected else False self.devicelist.addItem(label, sel) return True def udev_wait(self, operation): wait = Popen(["udevadm", "settle", "--timeout=15"], stderr=PIPE) wait.communicate() working = True while working: time.sleep(0.5) wait.poll() rc = wait.returncode if not (rc is None): wait = None working = False if rc != 0: self.return_state = False self.return_message = _("Timeout reached when {}".format(operation)) return False return True def update_list(self): self.devicelist.deleteAllItems() self.get_devices() self.img_name = "" self.ima.setLabel(self.ChooseImage) self.backup_select.setLabel(self.ChooseImage) self.dialog.recalcLayout() def update_list_on_event(self): selitem = self.devicelist.selectedItem().label() self.devicelist.deleteAllItems() self.get_devices(selected = selitem) if self.devicelist.selectedItem().label() != selitem: self.restore() def refresh_signal(self, device, params): self.devicelist.deleteAllItems() self.get_devices() self.dialog.recalcLayout() def device_selected(self): selitem = self.devicelist.selectedItem() if selitem is not None: self.dev = selitem.label() if self.dev != "": for name, path, size in self.list: if self.dev.startswith(name + ' (' + path.lstrip()): self.deviceSize = size self.device_name = name.rstrip().replace(' ', '') message = _('Target Device: {}').format(self.dev) self.logger(message) logging.info(message) partitions = self.u.get_partitions(self.dev.split('(')[1].split(')')[0]) # I18N: verb in singular 3rd person self.logger(_("Contains this/these partition(s)")) for partition in partitions: self.logger(_("{device}: Type={type}, Label={label}").format(device=partition['device'], type=partition['type'], label=partition['label'],)) if len(partitions) == 0: # I18N: None refers to partition (no partitions on the selected device) self.logger(_("None")) self.partition_cbox.setEnabled() self.partition_cbox.setChecked(False) self.backup_cbox.setEnabled() self.backup_cbox.setChecked(False) self.write_cbox.setEnabled() self.write_cbox.setChecked(False) self.cryptcb.setChecked(False) break else: self.partition_cbox.setDisabled() self.partition_cbox.setChecked(False) self.backup_cbox.setDisabled() self.backup_cbox.setChecked(False) self.write_cbox.setDisabled() self.write_cbox.setChecked(False) self.cryptcb.setChecked(False) @staticmethod def sizeof_fmt(num): # I18N these are units for files size for unit in [_('B'), _('KiB'), _('MiB'), _('GiB'), _('TiB')]: if abs(num) < 1024.0: return "%3.3f %s" % (num, unit) num /= 1024.0 return "%.1f %s" % (num, _('TiB')) def backup_choosed(self): # Add .img if not specified self.backup_img_name = yui.YUI.app().askForSaveFileName("", "*.img", _("Backup to:")) if self.backup_img_name != '': if not self.backup_img_name.lower().endswith('.img'): self.backup_img_name = self.backup_img_name + ".img" head, tail = os.path.split(self.backup_img_name) self.backup_dest = self.backup_img_name self.backup_select.setLabel(tail) self.dialog.recalcLayout() self.interface_active_state() self.backup_is_selected = True else: self.backup_is_selected = False def get_sum(self, source): self.return_state = False self.signature_checked = False logging.debug("Starting getting sum") # Check if the sum file has a valid signature gpg = gnupg.GPG() gpg.encoding = 'utf-8' # Use Mageia public key mageia_keyid = "835E41F4EDCA7A90" self.sum_type = 'sha3' sig_file = "{}.{}.gpg".format(source, self.sum_type) self.source_file = "{}.{}".format(source, self.sum_type) keys_list = gpg.list_keys() key_present = False for entry in keys_list: if (mageia_keyid == entry['keyid']): if entry['expires'] and (datetime.datetime.now().timestamp() > float(entry['expires'])): logging.info("Mageia key expired, reloading") else: logging.info("Mageia key already present") key_present = True break try: if not key_present: gpg.recv_keys('pool.sks-keyservers.net', mageia_keyid) self.sum_check_searched = True with open(sig_file, 'rb') as g: self.signature_found = True verified = gpg.verify_file(g, close_file=False) if verified: self.signature_checked = True logging.debug("signature checked") g.close() else: g.seek(0) verified = gpg.verify_file(g, self.source_file) if verified: self.signature_checked = True logging.debug("Detached signature is OK") else: self.signature_checked = False logging.warning("Signature is false") except Exception as e: self.signature_found = False logging.error(str(e)) logging.info(_("Signature file {} not found\n" + _("or key expired")).format(sig_file)) def do_format(self): #code, format_type, name = self.ask_format() if self.partition_cb.value() != "": self.operation = True format_type = list(self.format_type.keys())[list(self.format_type.values()).index(self.partition_cb.value())] if format_type == 'persist': self.emergency(_("Persistence partition is to use when writing a Live ISO image.")) return target = self.dev.split('(')[1].split(')')[0] info = Info(_("Formatting confirmation in {}".format(self.partition_cb.value())), True, self.warning) if self.ask_YesOrNo(info): name = self.partition_label.value() if format_type == 'fat32': name = name.upper()[:11] elif format_type == 'exfat' or format_type == 'ntfs': name = name[:32] rc = self.raw_format(target, format_type, name) self.operation = False if rc == 0: message = _('The device was formatted successfully.') self.logger(message) self.success() elif rc == 5: message = _("An error occurred while creating a partition.") self.logger(message) self.emergency(message) elif rc == 127: message = _('Authentication error.') self.logger(message) self.emergency(message) else: message = _('An error {} occurred.'.format(rc)) self.emergency(message) self.initial_state() def initial_state(self): self.update_list() self.image_is_selected = False self.backup_is_selected = False self.backup_select.setDisabled() self.backup_cbox.setDisabled() self.backup_dest = "" self.backup_select.setLabel(self.ChooseImage) self.partition_cbox.setDisabled() self.ima.setDisabled() self.img_name = "" self.ima.setLabel(self.ChooseImage) self.write_cbox.setDisabled() self.devicelist.setEnabled() self.progress.setLabel(_("Progress")) self.progress.setValue(0) self.progress.setDisabled() self.refreshbt.setEnabled() self.partition_cb.setDisabled() self.partition_label.setDisabled() self.partition_label.setValue("") self.cryptcb.setDisabled() self.cryptkey.setDisabled() self.cryptkey.setValue("") self.start_bt.setDisabled() def raw_format(self, usb_path, fstype, label): self.u.do_unmount(usb_path) self.udev_wait(_("unmounting")) self.operation = True if os.geteuid() > 0: launcher = 'pkexec' self.process = Popen( [launcher, '/usr/bin/python3', '-u', '/usr/lib/isodumper/raw_format.py', '-d', usb_path, '-f', fstype, '-l', label, '-u', str(os.geteuid()), '-g', str(os.getgid())], shell=False, stdout=PIPE, preexec_fn=os.setsid) else: self.process = Popen( ['/usr/bin/python3', '-u', '/usr/lib/isodumper/raw_format.py', '-d', usb_path, '-f', fstype, '-l', label, '-u', str(os.geteuid()), '-g', str(os.getgid())], shell=False, stdout=PIPE, preexec_fn=os.setsid) working = True while working: time.sleep(0.5) self.process.poll() rc = self.process.returncode if rc is None: working = True else: self.process = None working = False return rc def backup_go(self): self.wip_unsensitive() self.progress.setEnabled() self.progress.setLabel("Backup") dest = self.backup_img_name if os.path.exists(dest): info = Info(_("Backup confirmation"), True, _("Do you want to overwrite the file?")) if not (self.ask_YesOrNo(info)): self.returncode = 1 return False st = os.statvfs(os.path.dirname(dest)) free = st.f_bavail * st.f_frsize if free < self.deviceSize: sizeM = str(self.deviceSize / (1024 * 1024)) message = _("The destination directory is too small to receive the backup (%s Mb needed)") % (sizeM) self.logger(message) logging.warning(message) self.emergency(message) self.initial_state() return False else: self.operation = True self.returncode = 0 source = self.dev.split('(')[1].split(')')[0] message = _('Backup to: {}').format(dest) self.logger(message) logging.info(message) # Writing step self.iface.do_write(source, dest, self.deviceSize) while not self.iface.done: progress = self.iface.progress self.progress.setValue(progress) self.dialog.pollEvent() time.sleep(.2) success, message = self.iface.end() if success: #: don't translate source or target message = _('{source} successfully written to {target}').format(source=source.split('/')[-1], target=dest) self.logger(message) logging.info(message) self.progress.setEnabled() self.progress.setValue(100) self.dialog.pollEvent() self.logger(message) logging.info(message) self.success() self.initial_state() else: self.emergency(message) self.initial_state() return False return True def do_write(self): self.wip_unsensitive() self.progress.setEnabled() self.progress.setLabel("Writing") source = self.img_name target = self.dev.split('(')[1].split(')')[0] b = os.path.getsize(source) if b > (self.deviceSize): message = _('The device is too small to contain the ISO file.') self.logger(message) logging.error(message) self.emergency(message) else: info = Info(_("Writing confirmation"), True, self.warning) if self.ask_YesOrNo(info): if self.deviceSize > 1024 * 1024 * 1024 * 32: info = Info(_("Warning"), True, _('The device is bigger than 32 Gbytes. Are you sure you want use it?')) if self.ask_YesOrNo(info): pass else: return #: don't translate source or target self.progress.setLabel(_('Writing {source} to {target}').format(source=source.split('/')[-1], target=target.split('/')[-1])) message = _('Executing copy from {source} to {target}').format(source=source, target=target) self.operation = True self.logger(message) logging.info(message) success, message = self.u.do_unmount(target) self.udev_wait(_("unmounting")) if success: # Writing step self.iface.do_write(source, target, b) progress = self.iface.progress while not self.iface.done: progress = self.iface.progress self.progress.setValue(progress) self.dialog.pollEvent() time.sleep(.5) success, message = self.iface.end() if success: #: don't translate source or target message = _('Image {source} written to {target}').format(source=source.split('/')[-1], target=target) message += "\n" + _('Bytes written: {}').format(str(b)) self.logger(message) logging.info(message) self.progress.setLabel(_('Checking ') + target.split('/')[-1]) self.progress.setValue(0) self.dialog.pollEvent() # Checking self.iface.check_write(target, source) progress = 0 while progress < 100: self.progress.setValue(progress) self.dialog.pollEvent() time.sleep(.5) progress = self.iface.progress nowarning, message = self.iface.end() self.progress.setEnabled() self.logger(message) logging.info(message) # Add persistent partition if asked if self.partition_cbox.isChecked() and self.partition_cb.value() == "ext4": self.progress.setLabel(_("Adding persistent partition")) self.progress.setValue(0) if self.cryptcb.isChecked(): if self.cryptkey.value() == "": message = _("No key for encrypted partition provided. Adding the partition aborted.") self.logger(message) logging.warning(message) else: self.iface.do_persistence(target, self.partition_label.value(), self.cryptkey.value()) while not self.iface.done: progress = self.iface.progress self.progress.setValue(progress) self.progress.setLabel(self.iface.message) self.dialog.pollEvent() time.sleep(.5) if self.iface.state: message = _("Added encrypted persistent partition") self.logger(message) logging.warning(message) else: self.logger(_("Adding encrypted persistent partition failed")) self.logger(self.iface.message) nowarning = False else: self.iface.do_persistence(target, self.partition_label.value(), "") while not self.iface.done: progress = self.iface.progress self.progress.setValue(progress) self.dialog.pollEvent() time.sleep(.5) if self.iface.state: message = _("Added persistent partition") else: message = _("Adding persistent partition failed") self.logger(self.iface.message) nowarning = False self.logger(message) logging.warning(message) # Unmount if partitions are automatically mounted and then eject self.progress.setValue(100) self.dialog.pollEvent() success, message = self.u.do_unmount(target) self.udev_wait(_("unmounting")) if success: self.u.eject(target) if nowarning : self.success() else: self.warn() else: self.emergency(message) self.initial_state() else: self.emergency(message) self.initial_state() else: self.emergency(message) self.initial_state() else: self.initial_state() self.operation = False def success(self): self.operation = False self.final_unsensitive() info = Info(_("Success"), True, _("The operation completed successfully.") + "\n"\ + _("You are free to unplug it now, a logfile \n\ /home/-user-/.isodumper/isodumper.log has been saved.")+ "\n"\ + _("You may also consult /var/log/magiback.log")) if self.ask_OK(info): return def warn(self): self.operation = False self.final_unsensitive() info = Info(_("Warning"), True, _("The operation completed, but with anomalies.\n\ Check carefully the messages in log view") + "\n"\ + _("You are free to unplug it now, a logfile \n\ /home/-user-/.isodumper/isodumper.log has been saved.") + "\n"\ + _("You may also consult /var/log/magiback.log")) if self.ask_OK(info): return def confirm_close(self, *args): if not self.operation: # no writing , backup nor format running self.close() else: # writing , backup or format running info = Info(_("Warning"), True, _("Writing is in progress. Exiting during writing \n\ will make the device or the backup unusable.\n\ Are you sure you want to quit during writing?")) if self.ask_YesOrNo(info): return True else: return False def emergency(self, message): self.operation = False self.returncode = 1 self.final_unsensitive() info = Info(_("Error"), True, message) self.ask_OK(info) def final_unsensitive(self): self.ima.setDisabled() self.devicelist.setDisabled() self.write_cbox.setDisabled() self.backup_cbox.setDisabled() self.progress.setDisabled() self.backup_select.setDisabled() self.partition_cbox.setDisabled() def wip_unsensitive(self): self.ima.setDisabled() self.devicelist.setDisabled() self.backup_cbox.setDisabled() self.backup_select.setDisabled() self.write_cbox.setDisabled() self.refreshbt.setDisabled() self.partition_cbox.setDisabled() self.partition_cb.setDisabled() self.partition_label.setDisabled() self.cryptcb.setDisabled() self.cryptkey.setDisabled() self.start_bt.setDisabled() def close(self): # to exit from _handleEvents loop self._running = False def logger(self, text): self.logview.appendLines(text + "\n") def activate_devicelist(self): self.devicelist.setEnabled() message = _('ISO Image to copy: ') + self.img_name self.logger(message) logging.warning(message) # self.chooser.set_tooltip_text(self.img_name) def help_dialog(self): info = Info(_("IsoDumper"), True, _("Mageia IsoDumper") + "
\ ----------------
" + _("This GUI program is primarily for safely writing a bootable ISO image to a USB flash drive, \ an operation devious & potentially hazardous when done by hand. As a bonus, it can also back up the \ entire previous contents of the flash drive onto the hard disc, and restore the flash drive \ to its previous state subsequently.") + "
" + _("It gives also a feature for formatting the USB device.") + "

" + _( "IsoDumper can be launched either from the menus, or a user console with the command 'isodumper'.") + "
" + _( "The root password is solicited when this is necessary for the program's operation.") + "
" + _("The flash drive can be inserted beforehand or once the program is started. In the latter case, a \ dialogue will say that there is no flash drive inserted, and allow a 'retry' to find it once it is.
\ (You may have to close any automatically opened File Manager window).") + "

" + _("The fields of the main window are as follows:
\ - Device to work on: the device of the USB flash drive, a drop-down list to choose from.
\ - Write Image: to choose the source ISO image *.iso (or flash drive backup file *.img) to write out.
\ - Write to device: This button launches the operation - with a prior warning dialogue.") + "
" + _( "- Add a persistent partition: the remaining space will be used in a new partition where data from the Live system can be written and recovered between sessions.") + "
" + _( "- Encrypt: the persistent partition will be encrypted with the key provided in Key field.") + "
" + _("The operation is shown in the progress bar beneath.") + "
" + _("- Backup to: define the name and placement of the backup image file. The current flash drive \ will be backed up to a disc file. Note that the entire flash drive is preserved, regardless of its \ actual contents; ensure that you have the necessary free disc space (the same size as the USB device). \ This backup file can be used later to restore the flash drive by selecting it as the source *.img file to write out.") + "
" + _("- Backup the device: launch the backup operation.") + "
" + _("- Format the device: create an unique partition on the entire volume in the specified format in FAT, \ exFAT, NTFS or ext. You can specify a volume name and the format in a new dialog box.") + "
") if self.ask_OK(info): return def __init__(self, debug=False): #: placeholder for release number APP = "isodumper" DIR = "/usr/share/locale" # Call translation catalog gettext.install(APP, localedir=DIR, names=('ngettext',)) # Check that there is no other instance running current_pid = psutil.Process().pid proc_iter = psutil.process_iter(attrs=["pid", "name"]) for p in proc_iter: if (p.info["name"] == APP) and (p.info["pid"] != current_pid): info = Info(_("Error"), True, _("There is another instance of Isodumper already running.")) self.ask_OK(info) yui.YUILoader.deleteUI() self._running = False return # TODO Read log level from command line level = logging.DEBUG if debug else logging.ERROR logpath = os.path.join(os.path.expanduser('~'), '.isodumper') if not (os.path.isdir(logpath)): os.mkdir(logpath) logging.basicConfig(filename=os.path.join(logpath,"isodumper.log"), format='%(asctime)s %(levelname)-8s %(message)s', level=level) # define size of the selected device self.deviceSize = 0 # Operation running self.operation = False # Prepare interface on DBus self.bus = SystemBus() self.iface = self.bus.get("org.mageia.Magiback", "Isodumper") self.ChooseImage = _("Choose an image") self.warning = _("Warning\nThis will destroy all data on the target device,\n\ are you sure you want to proceed?\n\ If you say ok here, please do not unplug\ the device during the following operation.") """ Init/Constructor for the 'widgets' """ self.dialog = basedialog.BaseDialog.__init__(self, _("Isodumper {}").format(version.RELEASE), "", basedialog.DialogType.POPUP, 100, 20) yui.YUI.app().setApplicationIcon("/usr/share/icons/isodumper.png") def UIlayout(self, layout): # create the main gui # +---------------------+ # + banner + # +---------------------+ # | devicebox | # +---+-----+------+----+ # + L | ima | writebt | # +---+-----+------+----+ # + L | backup_select | backupbt | # +---+-----+------+----+ # + F | formatbt | # +----------------+----+ # | progress | # +---------------------+ # | report | # +---------------------+ # | refreshbt | aboutbt | helpbt | quitbt | # +---------------------+ #self.ancrage = self.factory.createReplacePoint(self.dialog) self.box = self.factory.createVBox(layout) self.bannerbox = self.factory.createHBox(self.box) self.banner = self.factory.createImage(self.bannerbox, "/usr/share/isodumper/header.svg") self.devicebox = self.factory.createHBox(self.box) self.devicelist = self.factory.createComboBox(self.devicebox, _("Select the device to work on:"), False) self.devicelist.setNotify() self.devicelist.setStretchable(0, True) self.refreshbt = self.factory.createPushButton(self.devicebox, _("Update list")) self.refreshbt.setStretchable(0, True) self.operation_frame = self.factory.createFrame(self.box, _("Select operations")) self.operationbox = self.factory.createVBox(self.operation_frame) self.labelbox = self.factory.createHBox(self.operationbox) self.labelbox.setStretchable(0, True) self.factory.createLabel(self.labelbox, "The selected operations will be executed in order from top to bottom.\nIf both write image and create partition are selected, the partition\nwill be created in the free space after the image.").setStretchable(0, True) self.backupbox = self.factory.createHBox(self.operationbox) self.backup_cbox = self.factory.createCheckBox(self.backupbox, _("Backup the device to:")) self.backup_cbox.setNotify() # add file picker for backup file name self.backup_select = self.factory.createPushButton(self.backupbox, self.ChooseImage) self.backup_select.setStretchable(0, True) self.factory.createHStretch(self.backupbox) self.writebox = self.factory.createHBox(self.operationbox) self.write_cbox = self.factory.createCheckBox(self.writebox, _("Write Image from:")) self.write_cbox.setNotify() # add file picker for image file self.ima = self.factory.createPushButton(self.writebox, self.ChooseImage) self.ima.setStretchable(0, True) self.factory.createHStretch(self.writebox) #self.writebt = self.factory.createPushButton(self.writebox, _("&Write to device")) self.partitionbox = self.factory.createHBox(self.operationbox) self.partition_cbox = self.factory.createCheckBox(self.partitionbox, _("Create partition of type:")) self.partition_cbox.setNotify() self.partition_cb = self.factory.createComboBox(self.partitionbox, _("Type:")) self.partition_cb.addItem("", True) self.format_type = {'fat32': _("FAT32"), 'ext4': _("ext4"), 'ntfs': _("NTFS"), 'exfat': _("exFAT"), 'persist': _("Persistent partition"), } for key in self.format_type: self.partition_cb.addItem(self.format_type[key], False) self.partition_cb.setNotify() self.partition_label = self.factory.createInputField(self.partitionbox, _("Label:")) self.partition_label.setStretchable(0, True) self.persistencebox = self.factory.createHBox(self.operationbox) #self.persistencecb = self.factory.createCheckBox(self.persistencecol1, # _("Add a persistent partition in the remaining space")) self.cryptcb = self.factory.createCheckBox(self.persistencebox, _("Encrypt partition using LUKS, with key:")) self.cryptcb.setNotify() self.cryptkey = self.factory.createInputField(self.persistencebox, _("Key:"), passwordMode=True) self.cryptkey.setStretchable(0, True) #self.formatbt = self.factory.createPushButton(self.formatbox, _("Format the device")) self.execute_frame = self.factory.createFrame(self.box, _("Execution")) self.execute_framevb = self.factory.createVBox(self.execute_frame) self.execute_label = self.factory.createLabel(self.execute_framevb, _("When you are sure all options are correct, start:")) self.start_bt = self.factory.createPushButton(self.execute_framevb, _("Execute")) self.start_bt.setNotify() self.progressbox = self.factory.createHBox(self.execute_framevb) self.progress = self.factory.createProgressBar(self.progressbox, _("Progress"), 100) self.progress.setStretchable(0, True) self.reportbox = self.factory.createHBox(self.box) self.reportbox.setWeight(1, 30) self.logview = self.factory.createLogView(self.reportbox, _("Report"), 10) self.logview.setStretchable(0, True) self.buttonsbox = self.factory.createHBox(self.box) self.aboutbt = self.factory.createPushButton(self.buttonsbox, _("About")) self.aboutbt.setStretchable(0, True) self.helpbt = self.factory.createPushButton(self.buttonsbox, _("Help")) self.helpbt.setStretchable(0, True) self.quitbt = self.factory.createPushButton(self.buttonsbox, _("Quit")) self.quitbt.setStretchable(0, True) # Connect events self.eventManager.addWidgetEvent(self.quitbt, self.confirm_close) self.eventManager.addWidgetEvent(self.ima, self.ask_image) #self.eventManager.addWidgetEvent(self.writebt, self.do_write) self.eventManager.addWidgetEvent(self.cryptcb, self.check_encryt) #self.eventManager.addWidgetEvent(self.backupbt, self.backup_go) self.eventManager.addWidgetEvent(self.backup_select, self.backup_choosed) #self.eventManager.addWidgetEvent(self.formatbt, self.do_format) self.eventManager.addWidgetEvent(self.backup_cbox, self.select_backup) self.eventManager.addWidgetEvent(self.write_cbox, self.select_write) self.eventManager.addWidgetEvent(self.partition_cbox, self.select_partition) self.eventManager.addWidgetEvent(self.partition_cb, self.select_partition_type) self.eventManager.addWidgetEvent(self.devicelist, self.device_selected) self.eventManager.addWidgetEvent(self.start_bt, self.start) self.eventManager.addWidgetEvent(self.refreshbt, self.update_list) self.eventManager.addWidgetEvent(self.helpbt, self.help_dialog) self.eventManager.addWidgetEvent(self.aboutbt, self.aboutDialog) self.u = None try: self.u = UDisks2() except: message = _('UDisks2 is not available on your system') logging.error(message) self.emergency(message) if not self.get_devices(): self.dialog.destroy() yui.YUILoader.deleteUI() self._running = False return # Set the initial state, active, disabled self.initial_state() self.dialog.recalcLayout() self.uEventQueue = SimpleQueue() if yui.YUI.app().isTextMode(): self.glib_loop = GLib.MainLoop() self.glib_thread = threading.Thread(target=self.glib_mainloop, args=(self.glib_loop,)) self.glib_thread.start() self.u.iface["org.freedesktop.DBus.ObjectManager"].InterfacesAdded.connect(self.device_changed) self.u.iface["org.freedesktop.DBus.ObjectManager"].InterfacesRemoved.connect(self.device_changed) def glib_mainloop(self, loop): ''' thread function for glib main loop ''' loop.run() def device_changed(self, a, b): logging.debug(f"Signal {a}") if not self.operation: if 'drives' in a.split('/'): logging.debug('device-changed') self.uEventQueue.put({'event': "device-changed", 'value': True}) def ask_image(self): self.img_name = yui.YUI.app().askForExistingFile("", "*.iso *.img", self.ChooseImage) if self.img_name != "": # Check if the image has a signed checksum self.get_sum(self.img_name) if self.signature_found: if not self.signature_checked: info = Info(_("Warning"), True, _("The validation of the GPG signature of the checksum file failed !") + "\n" + _("Do you want to continue?")) if self.ask_YesOrNo(info): pass else: self.img_name = "" return else: self.logger(_("The checksum file is signed")) else: info = Info(_("Warning"), True, _('No GPG signature has been found or the key is expired. Are you sure you want to use this image?')) if self.ask_YesOrNo(info): pass else: self.img_name = "" return self.ima.setLabel(os.path.basename(self.img_name)) self.dialog.recalcLayout() self.activate_devicelist() self.image_is_selected = True else: self.image_is_selected = False self.interface_active_state() def interface_active_state(self): # Determine the state of the Execute button and others, if they can be active or not if self.devicelist.selectedItem() is not None: self.partition_cbox.setEnabled() self.backup_cbox.setEnabled() self.write_cbox.setEnabled() else: self.partition_cbox.setDisabled() self.partition_cbox.setChecked(False) self.backup_cbox.setDisabled() self.backup_cbox.setChecked(False) self.write_cbox.setDisabled() self.write_cbox.setChecked(False) self.cryptcb.setChecked(False) backup_ready = self.backup_cbox.isChecked() and (self.backup_dest != "") write_ready = self.write_cbox.isChecked() and (self.img_name != "") partition_ready = self.partition_cbox.isChecked() and (self.partition_cb.value() in self.format_type.values()) crypt_ready = self.cryptcb.isChecked() and (self.cryptkey.value() != "") start_active = backup_ready or write_ready or partition_ready if start_active: self.start_bt.setEnabled() else: self.start_bt.setDisabled() if partition_ready: self.cryptcb.setEnabled() else: self.cryptcb.setDisabled() self.cryptkey.setDisabled() def select_backup(self): if self.backup_cbox.isChecked(): self.backup_select.setEnabled() else: self.backup_select.setDisabled() self.interface_active_state() def select_write(self): if self.write_cbox.isChecked(): self.ima.setEnabled() else: self.ima.setDisabled() self.interface_active_state() def select_partition(self): if self.partition_cbox.isChecked(): self.partition_cb.setEnabled() self.partition_label.setEnabled() else: self.partition_cb.setDisabled() self.partition_label.setDisabled() self.interface_active_state() def select_partition_type(self): if self.partition_cb.value() in ("ext4" ,"Persistent partition"): self.cryptcb.setEnabled() if self.partition_cb.value() == "Persistent partition": self.partition_label.setValue("mgalive-persist") self.partition_cb.setValue('ext4') else: self.cryptcb.setDisabled() self.cryptcb.setDisabled() self.cryptkey.setDisabled() self.interface_active_state() def check_encryt(self): if self.cryptcb.isChecked(): self.cryptkey.setEnabled() else: self.cryptkey.setDisabled() self.interface_active_state() def start(self): if self.backup_cbox.isChecked(): # check that a name is set if self.backup_is_selected: if self.backup_go(): self.backup_is_selected = False else: # something went wrong, we stop return else: info = Info(_("Error"), True, _("No image for backup is selected.")) self.ask_OK(info) return if self.write_cbox.isChecked(): if self.image_is_selected: self.do_write() self.image_is_selected = False else: info = Info(_("Error"), True, _("No image to write is selected.")) self.ask_OK(info) return if self.partition_cbox.isChecked(): # Create a partition without writing image, will use all the device space. self.do_format() def ask_format(self): atelier = yui.YUI.widgetFactory() dialog = atelier.createPopupDialog() vb = atelier.createVBox(dialog) label = atelier.createInputField(vb, _("Label for the device:")) cr = atelier.createRadioButtonGroup(vb) vb_c = atelier.createVBox(cr) vb_c1 = atelier.createHBox(vb_c) format_fat = atelier.createRadioButton(atelier.createLeft(vb_c1), _("FAT32 (Windows)")) vb_c4 = atelier.createHBox(vb_c) format_exfat = atelier.createRadioButton(atelier.createLeft(vb_c4), _("exFAT (Windows)")) vb_c2 = atelier.createHBox(vb_c) format_ntfs = atelier.createRadioButton(atelier.createLeft(vb_c2), _("NTFS (Windows)")) vb_c3 = atelier.createHBox(vb_c) format_ext = atelier.createRadioButton(atelier.createLeft(vb_c3), _("ext4 (Linux)")) bb = atelier.createHBox(vb) executebt = atelier.createPushButton(bb, _("Execute")) cancelbt = atelier.createPushButton(bb, _("Cancel")) dialog.open() returncode = True while True: event = dialog.waitForEvent() if event.eventType() == yui.YEvent.CancelEvent: returncode = False break if event.widget() == executebt: if format_fat.value(): format_type = 'fat32' format_label = label.value().upper()[:11] break if format_exfat.value(): format_type = 'exfat' format_label = label.value()[:32] break if format_ntfs.value(): format_type = 'ntfs' format_label = label.value()[:32] break if format_ext.value(): format_type = 'ext4' format_label = label.value() break if event.widget() == cancelbt: returncode = False break dialog.destroy() if returncode: return True, format_type, format_label else: return False, " ", " " def ask_OK(self, info): yui.YUI.widgetFactory mgafactory = yui.YMGAWidgetFactory.getYMGAWidgetFactory(yui.YExternalWidgets.externalWidgetFactory("mga")) dlg = mgafactory.createDialogBox(yui.YMGAMessageBox.B_ONE) dlg.setTitle(info.title) dlg.setText(info.text, info.richtext) dlg.setButtonLabel(_("OK"), yui.YMGAMessageBox.B_ONE) dlg.setMinSize(60, 10) return dlg.show() == yui.YMGAMessageBox.B_ONE def ask_YesOrNo(self, info): yui.YUI.widgetFactory mgafactory = yui.YMGAWidgetFactory.getYMGAWidgetFactory(yui.YExternalWidgets.externalWidgetFactory("mga")) dlg = mgafactory.createDialogBox(yui.YMGAMessageBox.B_TWO) dlg.setTitle(info.title) dlg.setText(info.text, info.richtext) dlg.setButtonLabel(_("Yes"), yui.YMGAMessageBox.B_ONE) dlg.setButtonLabel(_("No"), yui.YMGAMessageBox.B_TWO) dlg.setMinSize(60, 8) return dlg.show() == yui.YMGAMessageBox.B_ONE def aboutDialog(self): yui.YUI.widgetFactory mgafactory = yui.YMGAWidgetFactory.getYMGAWidgetFactory(yui.YExternalWidgets.externalWidgetFactory("mga")) dlg = mgafactory.createAboutDialog("Isodumper", version.RELEASE, "GPLv2", _("Oliver Grawert
Papoteur
Pictures : Timothée Giet"), _( "A tool for writing ISO images to a device") + "
http://gitweb.mageia.org/software/isodumper", "") dlg.show() def nodevDialog(self): yui.YUI.widgetFactory mgafactory = yui.YMGAWidgetFactory.getYMGAWidgetFactory(yui.YExternalWidgets.externalWidgetFactory("mga")) dlg = mgafactory.createDialogBox(yui.YMGAMessageBox.B_TWO) dlg.setTitle("IsoDumper") dlg.setText(_( "Warning\nNo target devices were found.\nYou need to plug in a USB Key to which the image can be written.")) dlg.setButtonLabel(_("Refresh"), yui.YMGAMessageBox.B_ONE) dlg.setButtonLabel(_("Cancel"), yui.YMGAMessageBox.B_TWO) dlg.setMinSize(60, 8) return dlg.show() == yui.YMGAMessageBox.B_ONE def doSomethingIntoLoop(self): self.traitement = None #if event.eventType() == yui.YEvent.CancelEvent: #self.confirm_close() #break try: item = self.uEventQueue.get_nowait() logging.debug(f"Dealing with {item['event']}") if item['event'] == "device-changed": self.update_list_on_event() except Exception as e: pass def run(self, debug=False): self._setupUI() # setting to False will break the event loop self._running = True self.timeout = 100 try: self._handleEvents() except Exception: import traceback traceback.print_exc() yui.YDialog.deleteAllDialogs() # Closing self.dialog.destroy() self.dialog = None if yui.YUI.app().isTextMode(): self.glib_loop.quit() self.glib_thread.join() if __name__ == "__main__": # Send the yui logging outside of the console log = yui.YUILog.setLogFileName("/dev/null") parser = ParseCLI('isodumper') parser.parser.add_argument('--debug', help=_('allow debug information'), action='store_true') if parser.args.version: print(version.RELEASE) else: if parser.args.debug: app = IsoDumper(debug=True) else: app = IsoDumper() app.run() # next line seems to be a workaround to prevent the qt-app from crashing # see https://github.com/libyui/libyui-qt/issues/41 yui.YUILoader.deleteUI()