aboutsummaryrefslogtreecommitdiffstats
path: root/backend/raw_write.py
diff options
context:
space:
mode:
authorPapoteur <papoteur@mageia.org>2023-06-30 11:31:02 +0200
committerPapoteur <papoteur@mageia.org>2023-06-30 11:31:02 +0200
commit2a774405fbcf1f3b5b7ba2d62db5745c060347f9 (patch)
treea0bf7d14ee7915d0b83de3ea936ba973a780cd02 /backend/raw_write.py
parent23cd3e7048252777e6bb38288b66c14c2ec718db (diff)
downloadisodumper-2a774405fbcf1f3b5b7ba2d62db5745c060347f9.tar
isodumper-2a774405fbcf1f3b5b7ba2d62db5745c060347f9.tar.gz
isodumper-2a774405fbcf1f3b5b7ba2d62db5745c060347f9.tar.bz2
isodumper-2a774405fbcf1f3b5b7ba2d62db5745c060347f9.tar.xz
isodumper-2a774405fbcf1f3b5b7ba2d62db5745c060347f9.zip
Formatting code with black
Diffstat (limited to 'backend/raw_write.py')
-rwxr-xr-xbackend/raw_write.py271
1 files changed, 159 insertions, 112 deletions
diff --git a/backend/raw_write.py b/backend/raw_write.py
index fe0fda9..b30f759 100755
--- a/backend/raw_write.py
+++ b/backend/raw_write.py
@@ -1,4 +1,4 @@
-#coding:utf-8
+# coding:utf-8
#!/usr/bin/python3
#
@@ -27,6 +27,7 @@ import gettext
import hashlib
import io
import logging
+
###########
# imports #
###########
@@ -38,17 +39,16 @@ from subprocess import Popen, PIPE
class Dumper(object):
-
- def _do_write(self,source,target, b, backup_mode, uid, gid):
+ def _do_write(self, source, target, b, backup_mode, uid, gid):
# Writing step
- #Dump mode
- self.returncode=0
- self.operation=True
- bs=4096*128
+ # Dump mode
+ self.returncode = 0
+ self.operation = True
+ bs = 4096 * 128
try:
- ifc=io.open(source, "rb",1)
+ ifc = io.open(source, "rb", 1)
except:
- message = _('Reading error.')+ source
+ message = _("Reading error.") + source
logging.debug(message)
self.return_state = False
self.return_message = message
@@ -56,27 +56,29 @@ class Dumper(object):
return
else:
try:
- ofc= io.open(target, 'wb',0)
+ ofc = io.open(target, "wb", 0)
except:
- message = _("You don't have permission to write to the device {}").format(target)
- logging.error(message)
- self.return_state = False
- self.return_message = message
- self.finished.set()
- return
+ message = _(
+ "You don't have permission to write to the device {}"
+ ).format(target)
+ logging.error(message)
+ self.return_state = False
+ self.return_message = message
+ self.finished.set()
+ return
else:
- self.operation=True
- steps=list(range(0, b+1, int(b/100)))
+ self.operation = True
+ steps = list(range(0, b + 1, int(b / 100)))
steps.append(b)
- indice=1
- written=0
- ncuts=int(b/bs)
+ indice = 1
+ written = 0
+ ncuts = int(b / bs)
while ncuts <= 100:
- bs=int(bs/2)
- ncuts=int(b/bs)
- for i in range(0,int(ncuts)+1):
+ bs = int(bs / 2)
+ ncuts = int(b / bs)
+ for i in range(0, int(ncuts) + 1):
try:
- buf=ifc.read(bs)
+ buf = ifc.read(bs)
except:
message = _("Reading error.")
logging.error(message)
@@ -93,31 +95,31 @@ class Dumper(object):
self.return_message = message
self.finished.set()
return
- written+=len(buf)
+ written += len(buf)
if written > steps[indice]:
- if indice%1==0:
+ if indice % 1 == 0:
self._progress = indice
- indice +=1
+ indice += 1
try:
os.fsync(ofc)
except:
- message = _("Writing error.")
- logging.debug(message)
- self.return_state = False
- self.return_message = message
- self.finished.set()
- return
+ message = _("Writing error.")
+ logging.debug(message)
+ self.return_state = False
+ self.return_message = message
+ self.finished.set()
+ return
if backup_mode:
# Restore user as owner
os.chown(target, uid, gid)
try:
ofc.close()
except:
- message = _("Writing error.")
- logging.error(message)
- self.return_message = message
- self.return_state = False
- return
+ message = _("Writing error.")
+ logging.error(message)
+ self.return_message = message
+ self.return_state = False
+ return
ifc.close()
self._progress = 100
self.return_state = True
@@ -131,106 +133,115 @@ class Dumper(object):
self.return_message = ""
b = os.path.getsize(source)
# Compute the sum from the written device
- steps=list(range(0, b+1, int(b/100)))
+ steps = list(range(0, b + 1, int(b / 100)))
steps.append(b)
- indice=0
- checked=0
- sha512func=hashlib.sha3_512()
- ncuts=int(b/1024)
- with open(target, 'rb') as f:
- for x in range(0,ncuts):
+ indice = 0
+ checked = 0
+ sha512func = hashlib.sha3_512()
+ ncuts = int(b / 1024)
+ with open(target, "rb") as f:
+ for x in range(0, ncuts):
block = f.read(1024)
sha512func.update(block)
if checked > steps[indice]:
self._progress = indice
- indice +=1
- checked+=1024
- logging.debug("last read %d"%(b-ncuts*1024))
- block = f.read(b-ncuts*1024)
+ indice += 1
+ checked += 1024
+ logging.debug("last read %d" % (b - ncuts * 1024))
+ block = f.read(b - ncuts * 1024)
sha512func.update(block)
- sha512sumcalc=sha512func.hexdigest().upper()
+ sha512sumcalc = sha512func.hexdigest().upper()
self.sum_check = ""
sum_type = "sha3"
sum_file = f"{source}.{sum_type}"
try:
# Look for sum files in the same directory as source
- with open(sum_file,'r') as fs:
+ with open(sum_file, "r") as fs:
# Read the sum in the file
- self.sum_check=(fs.readline()).split()[0]
+ self.sum_check = (fs.readline()).split()[0]
self.sum_file = True
except:
logging.info(_("Sum file {} not found\n").format(sum_file))
self.sum_file = False
self.return_state = True
- if (self.sum_check == "") : # Can't get stored sum
- self.return_message += _('SHA3 sum: {}').format(sha512sumcalc)
+ if self.sum_check == "": # Can't get stored sum
+ self.return_message += _("SHA3 sum: {}").format(sha512sumcalc)
# compare the sums
- elif (sha512sumcalc == self.sum_check) :
- #, keep the bracket, this is the place for sum type
- self.return_message +="\n" + _("The {} sum check is OK").format(sum_type)
+ elif sha512sumcalc == self.sum_check:
+ # , keep the bracket, this is the place for sum type
+ self.return_message += "\n" + _("The {} sum check is OK").format(sum_type)
else:
- self.return_message +="\n" + _("/!\\The computed and stored sums don't match")
+ self.return_message += "\n" + _(
+ "/!\\The computed and stored sums don't match"
+ )
self.return_state = False
self._progress = 100
logging.info(self.return_message)
self.finished.set()
def udev_wait(self, operation):
- wait = Popen(["udevadm","settle","--timeout=15"], stderr=PIPE)
+ wait = Popen(["udevadm", "settle", "--timeout=15"], stderr=PIPE)
wait.communicate()
- working=True
+ working = True
while working:
time.sleep(0.5)
wait.poll()
- rc=wait.returncode
+ rc = wait.returncode
if not (rc is None):
wait = None
- working= False
+ working = False
if rc != 0:
self.return_state = False
self.return_message = _("Timeout reached when {}".format(operation))
return False
return True
-
def _do_persistence(self, target, label, key):
logging.debug("Start doing persistence partition")
- p = Popen(["fdisk",target], stdin = PIPE, stderr=PIPE)
- outs, errs = p.communicate(input=b'n\np\n3\n\n\nw\n')
- working=True
+ p = Popen(["fdisk", target], stdin=PIPE, stderr=PIPE)
+ outs, errs = p.communicate(input=b"n\np\n3\n\n\nw\n")
+ working = True
while working:
time.sleep(0.5)
p.poll()
- rc=p.returncode
+ rc = p.returncode
if rc is None:
- working=True
+ working = True
else:
process = None
- working= False
+ working = False
if rc != 0:
if rc == 1:
# the error is Re-reading the partition table failed.: Device or resource busy
- logging.error(_("Error {} while doing persistent partition: {}").format(rc, errs.decode('utf-8')))
+ logging.error(
+ _("Error {} while doing persistent partition: {}").format(
+ rc, errs.decode("utf-8")
+ )
+ )
logging.error(_("Try reloading partition table"))
- p = Popen(["partprobe",target], stdin = PIPE, stderr=PIPE)
+ p = Popen(["partprobe", target], stdin=PIPE, stderr=PIPE)
outs, errs = p.communicate()
- working=True
+ working = True
while working:
time.sleep(0.5)
p.poll()
- rc=p.returncode
+ rc = p.returncode
if not (rc is None):
process = None
- working= False
+ working = False
else:
self.return_state = False
- self.return_message = _("Unable to reload the partition table: {}").format(errs.decode('utf-8'))
+ self.return_message = _(
+ "Unable to reload the partition table: {}"
+ ).format(errs.decode("utf-8"))
logging.error(self.return_message)
self.finished.set()
return
else:
self.return_state = False
- self.return_message = _("Error {} while doing persistent partition: {}").format(rc, errs.decode('utf-8'))
+ self.return_message = _(
+ "Error {} while doing persistent partition: {}"
+ ).format(rc, errs.decode("utf-8"))
logging.error(self.return_message)
self.finished.set()
return
@@ -243,25 +254,29 @@ class Dumper(object):
if key == "":
# example mkfs.ext4 -L mgalive-persist /dev/sdf3
self.return_message = _("Persistent partition added. Formatting...")
- process = Popen(['mkfs.ext4', "-q",'-F','-L', label, target+"3"],stderr=PIPE)
+ process = Popen(
+ ["mkfs.ext4", "-q", "-F", "-L", label, target + "3"], stderr=PIPE
+ )
outs, errs = process.communicate()
- working=True
+ working = True
while working:
time.sleep(0.5)
process.poll()
- rc=process.returncode
+ rc = process.returncode
if rc is None:
- working=True
+ working = True
else:
process = None
- working= False
+ working = False
if rc == 0:
self.return_state = True
self._progress = 100
self.return_message = _("Persistent partition done")
else:
self.return_state = False
- self.return_message = _("Error {} while doing persistent partition: {}").format(rc, errs.decode('utf-8'))
+ self.return_message = _(
+ "Error {} while doing persistent partition: {}"
+ ).format(rc, errs.decode("utf-8"))
self.finished.set()
return
logging.info("Persistent partition done")
@@ -271,21 +286,27 @@ class Dumper(object):
# example cryptsetup luksFormat /dev/sdb3
self.return_message = _("Persistent partition added. Encrypting...")
base_target = os.path.basename(target) + "3"
- process = Popen(['cryptsetup','luksFormat','-q', target+"3", '-d', '-'],stdin=PIPE, stderr=PIPE)
- outs, errs = process.communicate(input=key.encode('utf-8'))
- working=True
+ process = Popen(
+ ["cryptsetup", "luksFormat", "-q", target + "3", "-d", "-"],
+ stdin=PIPE,
+ stderr=PIPE,
+ )
+ outs, errs = process.communicate(input=key.encode("utf-8"))
+ working = True
while working:
time.sleep(0.5)
process.poll()
- rc=process.returncode
+ rc = process.returncode
if rc is None:
- working=True
+ working = True
else:
process = None
- working= False
+ working = False
if rc != 0:
self.return_state = False
- self.return_message = _("Error {} while doing persistent partition: {}").format(rc, errs.decode('utf-8'))
+ self.return_message = _(
+ "Error {} while doing persistent partition: {}"
+ ).format(rc, errs.decode("utf-8"))
logging.error(self.return_message)
self.finished.set()
return
@@ -295,21 +316,34 @@ class Dumper(object):
if not self.udev_wait("creating encrypted partition"):
return
- process = Popen(['cryptsetup','luksOpen', target + "3", 'crypt_' + base_target ,'-d','-'],stdin=PIPE, stderr=PIPE)
- outs, errs = process.communicate(input=key.encode('utf-8'))
- working=True
+ process = Popen(
+ [
+ "cryptsetup",
+ "luksOpen",
+ target + "3",
+ "crypt_" + base_target,
+ "-d",
+ "-",
+ ],
+ stdin=PIPE,
+ stderr=PIPE,
+ )
+ outs, errs = process.communicate(input=key.encode("utf-8"))
+ working = True
while working:
time.sleep(0.5)
process.poll()
- rc=process.returncode
+ rc = process.returncode
if rc is None:
- working=True
+ working = True
else:
process = None
- working= False
+ working = False
if rc != 0:
self.return_state = False
- self.return_message = _("Error {} while doing persistent partition: {}").format(rc, errs.decode('utf-8'))
+ self.return_message = _(
+ "Error {} while doing persistent partition: {}"
+ ).format(rc, errs.decode("utf-8"))
logging.error(self.return_message)
self.finished.set()
return
@@ -319,21 +353,33 @@ class Dumper(object):
self._progress = 60
self.return_message = _("Persistent partition opened. Formatting...")
# mkfs.ext4 -L mgalive-persist /dev/mapper/crypt_sdb3
- process = Popen(['mkfs.ext4','-q','-F','-L', label, '/dev/mapper/crypt_' + base_target],stderr=PIPE)
+ process = Popen(
+ [
+ "mkfs.ext4",
+ "-q",
+ "-F",
+ "-L",
+ label,
+ "/dev/mapper/crypt_" + base_target,
+ ],
+ stderr=PIPE,
+ )
outs, errs = process.communicate()
- working=True
+ working = True
while working:
time.sleep(0.5)
process.poll()
- rc=process.returncode
+ rc = process.returncode
if rc is None:
- working=True
+ working = True
else:
process = None
- working= False
+ working = False
if rc != 0:
self.return_state = False
- self.return_message = _("Error {} while doing persistent partition: {}").format(rc, errs.decode('utf-8'))
+ self.return_message = _(
+ "Error {} while doing persistent partition: {}"
+ ).format(rc, errs.decode("utf-8"))
logging.error(self.return_message)
self.finished.set()
return
@@ -342,21 +388,23 @@ class Dumper(object):
if not self.udev_wait(_("formatting encrypted partition")):
return
- process = Popen(['cryptsetup','luksClose', 'crypt_' + base_target ])
+ process = Popen(["cryptsetup", "luksClose", "crypt_" + base_target])
outs, errs = process.communicate()
- working=True
+ working = True
while working:
time.sleep(0.5)
process.poll()
- rc=process.returncode
+ rc = process.returncode
if rc is None:
- working=True
+ working = True
else:
process = None
- working= False
+ working = False
if rc != 0:
self.return_state = False
- self.return_message = _("Error {} while doing persistent partition: {}").format(rc, errs.decode('utf-8'))
+ self.return_message = _(
+ "Error {} while doing persistent partition: {}"
+ ).format(rc, errs.decode("utf-8"))
logging.error(self.return_message)
else:
self.return_state = True
@@ -368,8 +416,7 @@ class Dumper(object):
self.finished.set()
logging.debug("Finished")
-
def __init__(self):
- gettext.install('isodumper', localedir='/usr/share/locale')
+ gettext.install("isodumper", localedir="/usr/share/locale")
# Operation running
- self.operation=False
+ self.operation = False