From 5f24430ad44b80ff05d1fa5499f917681d51ed38 Mon Sep 17 00:00:00 2001 From: Papoteur Date: Tue, 13 Mar 2018 22:56:44 +0100 Subject: Move raw_write.py --- backend/raw_write.py | 222 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100755 backend/raw_write.py (limited to 'backend') diff --git a/backend/raw_write.py b/backend/raw_write.py new file mode 100755 index 0000000..31d90dc --- /dev/null +++ b/backend/raw_write.py @@ -0,0 +1,222 @@ +#coding:utf-8 + +#!/usr/bin/python3 +# +# Copyright (c) 2007-2009 Canonical Ltd. +# +# Author: Oliver Grawert +# +# Modifications 2013 from papoteur +# and Geiger David +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. + +########### +# imports # +########### +#import locale +import os +import io +import gettext +import pydbus +from subprocess import call +import hashlib +import gnupg + +class Dumper(object): + + def _do_write(self,source,target): + # Writing step + #Dump mode + self.returncode=0 + b=os.path.getsize(source) + self.operation=True + bs=4096*128 + try: + ifc=io.open(source, "rb",1) + except: + message = _('Reading error.')+ source + self.logger(message) + self.return_state = False + self.return_message = message + self.finished.set() + return + else: + try: + ofc= io.open(target, 'wb',0) + except: + message = _("You don't have permission to write to the device {}").format(target) + self.logger(message) + self.return_state = False + self.return_message = message + self.finished.set() + return + else: + self.operation=True + steps=list(range(0, b+1, int(b/100))) + steps.append(b) + indice=1 + written=0 + ncuts=int(b/bs) + while ncuts <= 100: + bs=int(bs/2) + ncuts=int(b/bs) + print(ncuts,bs) + for i in range(0,int(ncuts)+1): + try: + buf=ifc.read(bs) + except: + message = _("Reading error.") + self.logger(message) + self.return_state = False + self.return_message = message + self.finished.set() + return + try: + ofc.write(buf) + except: + message = _("Writing error.") + self.return_state = False + self.return_message = message + self.finished.set() + return + written+=len(buf) + if written > steps[indice]: + if indice%1==0: + self._progress = indice + indice +=1 + try: + os.fsync(ofc) + except: + message = _("Writing error.") + self.return_state = False + self.return_message = message + self.finished.set() + return + try: + ofc.close() + except: + message = _("Writing error.") + self.logger(message) + self.return_message = message + self.return_state = False + return + ifc.close() + self._progress = 100 + self.finished.set() + self.return_state = True + self.return_message = _("Success") + return + + def _do_unmount(self, target): + target = target[0] + message = _("No partition is mounted.") + retcode = 0 + try: + lines = [line.strip("\n").split(" ") for line in open ("/etc/mtab", "r").readlines()] + mounts = [mount for mount in lines if mount[0].startswith(target)] + except: + message = _('Could not read mtab ! {} {}'.format(os.getuid(),target)) + return False, message + if mounts: + message =_("Unmounting all partitions of {}:\n").format(target) + print(message) + for mount in mounts: + message +=_("Trying to unmount {}...\n").format(mount[0]) + try: + retcode = call('umount '+mount[0], shell=True) + if retcode == 32: + message += _('Partition {} is busy').format(mount[0]) + elif retcode< 0: + message += _('Error, umount {} was terminated by signal {}').format(mount[0],retcode) + elif retcode == 0: + message += _('{} successfully unmounted').format(mount[0]) + else: + message += _('Error, umount {} returned ').format(str(retcode)) + except OSError as e: + message += _('Execution failed: {}').format(str(e)) + print(message) + return not bool(retcode), message + + def _check_write(self, target, source): + self.return_state = False + self.return_message = "" + b = os.path.getsize(source) + # Check if the sum file has a valid signature + signature_checked = False + gpg = gnupg.GPG() + gpg.encoding = 'utf-8' + # Use Mageia public key + sig_file = source +'.sha512.gpg' + sumcheck = "" + try: + gpg.recv_keys('pgp.mit.edu', 'EDCA7A90') + with open(sig_file, 'rb') as g: + verified = gpg.verify_file(g) + if verified.valid: + signature_checked = True + # Look for sum files in the same directory as source + fs=open(source+'.sha512','r') + # Read the sum in the file + sumcheck=(fs.readline()).split()[0] + else: + self.return_message = _('Invalid signature for %s.sha512)'%source) + self.return_state = True + self.finished.set() + return + except: + self.return_message = _("Signature file {} not found\n").format(sig_file) + + # Compute the sum from the written device + steps=list(range(0, b+1, int(b/100))) + steps.append(b) + indice=0 + checked=0 + sha512func=hashlib.sha512() + ncuts=int(b/1024) + #try: + with open(target, 'rb') as f: + for x in range(0,ncuts): + block = f.read(1024) + sha512func.update(block) + if checked > steps[indice]: + self._progress = indice + indice +=1 + checked+=1024 + block = f.read(b-ncuts*1024) + sha512func.update(block) + sha512sumcalc=sha512func.hexdigest() + f.close() + if (sumcheck == "") : + self.return_message += _('SHA512 sum: {}').format(sha512sumcalc) + # compare the sums + elif (sha512sumcalc == sumcheck) : + if signature_checked: + self.return_message += _("The sha512 sum check is OK and the sum is signed") + else : + self.return_message += _("The sha512 sum check is OK but the signature can't be found") + else: + self.return_message += _("/!\\The computed and stored sums don't match") + #except: + #pass + self._progress = 100 + + self.return_state = True + self.finished.set() + + def __init__(self): + gettext.install('isodumper', localedir='/usr/share/locale') + # Operation running + self.operation=False -- cgit v1.2.1