#coding:utf-8 #!/usr/bin/python3 # # Copyright (c) 2007-2009 Canonical Ltd. # # Author: Oliver Grawert # # Modifications 2013 from papoteur # and Geiger David # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. ########### # imports # ########### #import locale import os import io import sys import gettext from subprocess import call, Popen, PIPE import hashlib import gnupg import time import logging class Dumper(object): def _do_write(self,source,target, b): # Writing step #Dump mode self.returncode=0 self.operation=True bs=4096*128 try: ifc=io.open(source, "rb",1) except: message = _('Reading error.')+ source logging.debug(message) self.return_state = False self.return_message = message self.finished.set() return else: try: ofc= io.open(target, 'wb',0) except: message = _("You don't have permission to write to the device {}").format(target) logging.error(message) self.return_state = False self.return_message = message self.finished.set() return else: self.operation=True steps=list(range(0, b+1, int(b/100))) steps.append(b) indice=1 written=0 ncuts=int(b/bs) while ncuts <= 100: bs=int(bs/2) ncuts=int(b/bs) for i in range(0,int(ncuts)+1): try: buf=ifc.read(bs) except: message = _("Reading error.") logging.error(message) self.return_state = False self.return_message = message self.finished.set() return try: ofc.write(buf) except: message = _("Writing error.") logging.debug(message) self.return_state = False self.return_message = message self.finished.set() return written+=len(buf) if written > steps[indice]: if indice%1==0: self._progress = indice indice +=1 try: os.fsync(ofc) except: message = _("Writing error.") logging.debug(message) self.return_state = False self.return_message = message self.finished.set() return try: ofc.close() except: message = _("Writing error.") logging.error(message) self.return_message = message self.return_state = False return ifc.close() self._progress = 100 self.finished.set() self.return_state = True self.return_message = _("Writing terminated") logging.debug(self.return_message) return def _do_unmount(self, target): target = target[0] logging.debug("Starting unmounting") message = _("No partition is mounted.") retcode = 0 try: lines = [line.strip("\n").split(" ") for line in open ("/etc/mtab", "r").readlines()] mounts = [mount for mount in lines if mount[0].startswith(target)] except: message = _('Could not read mtab ! {} {}'.format(os.getuid(),target)) return False, message if mounts: message =_("Unmounting all partitions of {}:\n").format(target) print(message) for mount in mounts: message +=_("Trying to unmount {}...\n").format(mount[0]) try: retcode = call('umount '+mount[0], shell=True) if retcode == 32: message += _('Partition {} is busy').format(mount[0]) elif retcode< 0: message += _('Error, umount {} was terminated by signal {}').format(mount[0],retcode) elif retcode == 0: message += _('{} successfully unmounted').format(mount[0]) else: message += _('Error, umount returned {}').format(str(retcode)) except OSError as e: message += _('Execution failed: {}').format(str(e)) print(message, file=sys.stderr) logging.info(message) return not bool(retcode), message def _get_sum(self, source): self.return_state = False logging.debug("Starting getting sum") # Check if the sum file has a valid signature gpg = gnupg.GPG() gpg.encoding = 'utf-8' # Use Mageia public key sum_type = 'sha3' sig_file = "{}.{}.gpg".format(source, sum_type) source_file = "{}.{}".format(source, sum_type) try: gpg.recv_keys('pgp.mit.edu', 'EDCA7A90') self.sum_check_searched = True with open(sig_file, 'rb') as g: self.signature_found = True verified = gpg.verify_file(g, source_file) if verified.valid: self.signature_checked = True logging.debug("signature checked") else: self.signature_checked = False except: self.signature_found = False logging.info(_("Signature file {} not found\n").format(sig_file)) try: # Look for sum files in the same directory as source with open(source_file,'r') as fs: # Read the sum in the file self.sum_check=(fs.readline()).split()[0] self.sum_file = True except: logging.info(_("Sum file {} not found\n").format(source_file)) self.sum_file = False def _check_write(self, target, source): logging.debug("Start checking") self.return_state = False self.return_message = "" b = os.path.getsize(source) # Compute the sum from the written device steps=list(range(0, b+1, int(b/100))) steps.append(b) indice=0 checked=0 sha512func=hashlib.sha3_512() ncuts=int(b/1024) #try: with open(target, 'rb') as f: for x in range(0,ncuts): block = f.read(1024) sha512func.update(block) if checked > steps[indice]: self._progress = indice indice +=1 checked+=1024 block = f.read(b-ncuts*1024) sha512func.update(block) sha512sumcalc=sha512func.hexdigest() f.close() if self.signature_found and not self.signature_checked: #, keep the pourcent, this is the place for source file name self.return_message = _('Invalid signature for %s')%source_file if (self.sum_check == "") : # Can't get stored sum self.return_message += _('SHA3 sum: {}').format(sha512sumcalc) # compare the sums elif (sha512sumcalc == self.sum_check) : if self.signature_checked: #, keep the bracket, this is the place for sum type self.return_message +="\n" + _("The {} sum check is OK and the sum is signed").format(sum_type) else : #, keep the bracket, this is the place for sum type self.return_message +="\n" + _("The {} sum check is OK but the signature can't be found").format(sum_type) else: self.return_message +="\n" + _("/!\\The computed and stored sums don't match") #except: #pass self._progress = 100 logging.info(self.return_message) self.return_state = True self.finished.set() def _do_persistence(self, target, label, key): logging.debug("Start doing persistence partition") p = Popen(["fdisk",target], stdin = PIPE, stderr=PIPE) outs, errs = p.communicate(input=b'n\np\n3\n\n\nw\n') working=True while working: time.sleep(0.5) p.poll() rc=p.returncode if rc is None: working=True else: process = None working= False if rc != 0: self.return_state = False self.return_message = _("Error while doing persistent partition: ") + errs.decode('utf-8') return logging.debug("New partition created") if key == "": # example mkfs.ext4 -L mgalive-persist /dev/sdf3 process = Popen(['mkfs.ext4','-L', label, target+"3"],sterr=PIPE) outs, errs = p.communicate() working=True while working: time.sleep(0.5) process.poll() rc=process.returncode if rc is None: working=True else: process = None working= False if rc == 0: self.return_state = True self.return_message = _("Persistent partition done") else: self.return_state = False self.return_message = _("Error while doing persistent partition: ") + errs return logging.info("Persistent partition done") else: # example cryptsetup luksFormat /dev/sdb3 print("Crypt key provided",file=sys.stderr) base_target = os.path.basename(target) + "3" process = Popen(['cryptsetup','luksFormat','-q', target+"3", '-d', '-'],stdin=PIPE, stderr=PIPE) outs, errs = process.communicate(input=key.encode('utf-8')) working=True while working: time.sleep(0.5) process.poll() rc=process.returncode if rc is None: working=True else: process = None working= False if rc != 0: self.return_state = False self.return_message = _("Error while doing persistent partition: ") + errs logging.error(self.return_message) return # cryptsetup open /dev/sdb3 crypt_sdb3 process = Popen(['cryptsetup','luksOpen', target + "3", 'crypt_' + base_target ,'-d','-'],stdin=PIPE, stderr=PIPE) outs, errs = process.communicate(input=key.encode('utf-8')) working=True while working: time.sleep(0.5) process.poll() rc=process.returncode if rc is None: working=True else: process = None working= False if rc != 0: self.return_state = False self.return_message = _("Error while doing persistent partition: ") + errs logging.error(self.return_message) return # mkfs.ext4 -L mgalive-persist /dev/mapper/crypt_sdb3 process = Popen(['mkfs.ext4','-L', label, '/dev/mapper/crypt_' + base_target],stderr=PIPE) outs, errs = process.communicate() working=True while working: time.sleep(0.5) process.poll() rc=process.returncode if rc is None: working=True else: process = None working= False if rc != 0: self.return_state = False self.return_message = _("Error while doing persistent partition: ") + errs logging.error(self.return_message) return # cryptsetup close crypt_sdb3 process = Popen(['cryptsetup','luksClose', 'crypt_' + base_target ]) outs, errs = process.communicate() working=True while working: time.sleep(0.5) process.poll() rc=process.returncode if rc is None: working=True else: process = None working= False if rc != 0: self.return_state = False self.return_message = _("Error while doing persistent partition: ") + errs logging.error(self.return_message) else: self.return_state = True self.return_message = _("Persistent partition done") logging.info("Persistent partition done") def __init__(self): gettext.install('isodumper', localedir='/usr/share/locale') # Operation running self.operation=False