#coding:utf-8 #!/usr/bin/python3 # # Copyright (c) 2007-2009 Canonical Ltd. # # Author: Oliver Grawert # # Modifications 2013 from papoteur # and Geiger David # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. ########### # imports # ########### #import locale import os import io import gettext from subprocess import call import hashlib import gnupg class Dumper(object): def _do_write(self,source,target, b): # Writing step #Dump mode self.returncode=0 self.operation=True bs=4096*128 try: ifc=io.open(source, "rb",1) except: message = _('Reading error.')+ source self.return_state = False self.return_message = message self.finished.set() return else: try: ofc= io.open(target, 'wb',0) except: message = _("You don't have permission to write to the device {}").format(target) logging.error(message) self.return_state = False self.return_message = message self.finished.set() return else: self.operation=True steps=list(range(0, b+1, int(b/100))) steps.append(b) indice=1 written=0 ncuts=int(b/bs) while ncuts <= 100: bs=int(bs/2) ncuts=int(b/bs) for i in range(0,int(ncuts)+1): try: buf=ifc.read(bs) except: message = _("Reading error.") logging.error(message) self.return_state = False self.return_message = message self.finished.set() return try: ofc.write(buf) except: message = _("Writing error.") self.return_state = False self.return_message = message self.finished.set() return written+=len(buf) if written > steps[indice]: if indice%1==0: self._progress = indice indice +=1 try: os.fsync(ofc) except: message = _("Writing error.") self.return_state = False self.return_message = message self.finished.set() return try: ofc.close() except: message = _("Writing error.") logging.error(message) self.return_message = message self.return_state = False return ifc.close() self._progress = 100 self.finished.set() self.return_state = True self.return_message = _("Success") return def _do_unmount(self, target): target = target[0] message = _("No partition is mounted.") retcode = 0 try: lines = [line.strip("\n").split(" ") for line in open ("/etc/mtab", "r").readlines()] mounts = [mount for mount in lines if mount[0].startswith(target)] except: message = _('Could not read mtab ! {} {}'.format(os.getuid(),target)) return False, message if mounts: message =_("Unmounting all partitions of {}:\n").format(target) print(message) for mount in mounts: message +=_("Trying to unmount {}...\n").format(mount[0]) try: retcode = call('umount '+mount[0], shell=True) if retcode == 32: message += _('Partition {} is busy').format(mount[0]) elif retcode< 0: message += _('Error, umount {} was terminated by signal {}').format(mount[0],retcode) elif retcode == 0: message += _('{} successfully unmounted').format(mount[0]) else: message += _('Error, umount returned {}').format(str(retcode)) except OSError as e: message += _('Execution failed: {}').format(str(e)) print(message) return not bool(retcode), message def _check_write(self, target, source): self.return_state = False self.return_message = "" b = os.path.getsize(source) # Check if the sum file has a valid signature signature_checked = False gpg = gnupg.GPG() gpg.encoding = 'utf-8' # Use Mageia public key sig_file = source +'.sha512.gpg' sumcheck = "" try: gpg.recv_keys('pgp.mit.edu', 'EDCA7A90') with open(sig_file, 'rb') as g: verified = gpg.verify_file(g) if verified.valid: signature_checked = True # Look for sum files in the same directory as source fs=open(source+'.sha512','r') # Read the sum in the file sumcheck=(fs.readline()).split()[0] else: self.return_message = _('Invalid signature for %s.sha512')%source self.return_state = True self.finished.set() return except: self.return_message = _("Signature file {} not found\n").format(sig_file) # Compute the sum from the written device steps=list(range(0, b+1, int(b/100))) steps.append(b) indice=0 checked=0 sha512func=hashlib.sha512() ncuts=int(b/1024) #try: with open(target, 'rb') as f: for x in range(0,ncuts): block = f.read(1024) sha512func.update(block) if checked > steps[indice]: self._progress = indice indice +=1 checked+=1024 block = f.read(b-ncuts*1024) sha512func.update(block) sha512sumcalc=sha512func.hexdigest() f.close() if (sumcheck == "") : self.return_message += _('SHA512 sum: {}').format(sha512sumcalc) # compare the sums elif (sha512sumcalc == sumcheck) : if signature_checked: self.return_message += _("The sha512 sum check is OK and the sum is signed") else : self.return_message += _("The sha512 sum check is OK but the signature can't be found") else: self.return_message += _("/!\\The computed and stored sums don't match") #except: #pass self._progress = 100 self.return_state = True self.finished.set() def __init__(self): gettext.install('isodumper', localedir='/usr/share/locale') # Operation running self.operation=False