#coding:utf-8 #!/usr/bin/python3 # # Copyright (c) 2007-2009 Canonical Ltd. # # Author: Oliver Grawert # # Modifications 2013 from papoteur # and Geiger David # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. ########### # imports # ########### #import locale import os import io import gettext from gettext import gettext as _ import pydbus class Dumper(object): def do_write(self,source,dev): target = dev.split('(')[1].split(')')[0] self.do_umount(target) # Writing step #Dump mode self.returncode=0 b=os.path.getsize(source) self.operation=True bs=4096*128 try: ifc=io.open(source, "rb",1) except: message = _('Reading error.')+ source self.logger(message) return False,message else: try: ofc= io.open(target, 'wb',0) except: message = _("You don't have permission to write to the device {}").format(target) self.logger(message) return False,message else: self.progress.setLabel(_('Writing {source} to {target}').format(source=source.split('/')[-1],target=target.split('/')[-1]) self.logger(_('Executing copy from ')+source+_(' to ')+target) steps=list(range(0, b+1, int(b/100))) steps.append(b) indice=1 written=0 ncuts=int(b/bs) while ncuts <= 100: bs=int(bs/2) ncuts=int(b/bs) for i in range(0,int(ncuts)+1): try: buf=ifc.read(bs) except: message = _("Reading error.") self.logger(message) return False, message try: ofc.write(buf) except: message = _("Writing error.") self.logger(message) return False, message written+=len(buf) if written > steps[indice]: if indice%1==0: self.logger(_('Wrote: ')+str(indice)+'% '+str(written)+' bytes') self.sendProgress(indice) self.dialog.pollEvent() indice +=1 try: os.fsync(ofc) except: message = _("Writing error.") self.logger(message) return False,message self.progress.setValue(100) self.logger(_('Image ')+source.split('/')[-1]+_(' successfully written to ')+target) self.logger(_('Bytes written: ')+str(written)) try: ofc.close() except: message = _("Writing error.") self.logger(message) ifc.close() return True, _("Succes") def do_umount(self, target): try: lines = [line.strip("\n").split(" ") for line in open ("/etc/mtab", "r").readlines()] mounts = [mount for mount in lines if mount[0].startswith(target)] except: message = _('Could not read mtab !') self.logger(message) return False, message if mounts: self.logger(_('Unmounting all partitions of ')+target+':') for mount in mounts: self.logger(_('Trying to unmount ')+mount[0]+'...') try: retcode = call('umount '+mount[0], shell=True) if retcode == 32: message = _('Partition %s is busy')%mount[0] elif retcode< 0: message = _('Error, umount ')+mount[0]+_(' was terminated by signal ')+str(retcode) elif retcode == 0: message=_('{} successfully unmounted').format(mount[0]) else: message = _('Error, umount {} returned ').format(str(retcode)) except OSError as e: message = _('Execution failed: {}').format(str(e)) return retcode, message def __init__(self): gettext.bindtextdomain(APP, DIR) gettext.textdomain(APP) # Operation running self.operation=False