Fbrowser/stanzip.py
2024-06-29 01:01:51 -05:00

152 lines
6.0 KiB
Python
Executable File

# stanzip.py
# Description: Can Compress and Extract files using Various libraries more compression methods are going to be added
#
import os
import zipfile
import rarfile
import py7zr
import shutil
import argparse
import tqdm
from concurrent.futures import ThreadPoolExecutor
# File Extractor
class Extractor:
def zipviewer(self, source, destination):
if not os.path.exists(source):
print(f"Error: Archive file not found: {source}")
return
try:
pbar = tqdm.tqdm(total=100, desc="Extracting Archive file")
if not os.path.exists(destination):
os.makedirs(destination)
pbar.update(1)
if source.endswith(".zip"):
with zipfile.ZipFile(source, 'r') as zip_ref:
with tqdm.tqdm(total=len(zipfile.ZipFile(source).namelist()), desc="Extracting ZIP files") as pbar:
for filename in zip_ref.namelist():
zip_ref.extract(filename, destination)
pbar.update(1)
print(f"Extracted all files from {source} to {destination}")
elif source.endswith(".rar, .tar.gz, .tar.bz2, .tar.xz, .tar.zst"):
with rarfile.RarFile(source, 'r') as rar_ref:
with tqdm.tqdm(total=len(rar_ref.namelist()), desc="Extracting RAR files") as pbar:
for filename in rar_ref.namelist():
rar_ref.extractall(filename, destination)
pbar.update(1)
print(f"Extracted all files from {source} to {destination}")
elif source.endswith(".7z"):
with py7zr.SevenZipFile(source, 'r') as sevenzip_ref:
with tqdm.tqdm(total=len(sevenzip_ref.namelist()), desc="Extracting 7z files") as pbar:
for filename in sevenzip_ref.namelist():
sevenzip_ref.extractall(filename, destination)
pbar.update(1)
print(f"Extracted all files from {source} to {destination}")
else:
print(f"Unsupported file format: {source}")
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
print(f"ZIP Extraction Error: {e}")
except (rarfile.RarFileException, rarfile.NotRARFile) as e:
print(f"RAR Extraction Error: {e}")
except py7zr.exceptions.SevenZipException as e:
print(f"7z Extraction Error: {e}")
except OSError as e:
print(f"Extraction Error: {e}")
# File Compressor
class Compressor:
def __init__(self):
pass
def _compress_folder(self, source_path, zip_file):
for root, _, files in os.walk(source_path):
for file in files:
file_path = os.path.join(root, file)
archive_path = os.path.relpath(file_path, source_path)
self._compress_file(file_path, zip_file, archive_path)
def _compress_file(self, file_path, zip_file, archive_path=None):
if not archive_path:
archive_path = os.path.basename(file_path)
if archive_path.endswith(".zip"):
return
with open(file_path, 'rb') as file:
for chunk in iter(lambda: file.read(1024 * 1024), b''):
zip_file.writestr(archive_path, chunk)
def compress(self, source_path, archive_name, archive_format="zip"):
if archive_format != "zip":
raise ValueError(f"Unsupported archive format: {archive_format}")
archive_path = os.path.join(os.path.dirname(source_path), f"{archive_name}.{archive_format}")
# Check if source path exists
if not os.path.exists(source_path):
print(f"Source path does not exist: {source_path}")
return
# Compress the source path
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
if os.path.isdir(source_path):
print(f"Compressing folder: {source_path}")
self._compress_folder(source_path, zip_file)
else:
print(f"Compressing file: {source_path}")
self._compress_file(source_path, zip_file)
print(f"Compressed to: {archive_path}")
if os.path.isdir(source_path):
file_list = []
for root, _, files in os.walk(source_path):
for file in files:
file_path = os.path.join(root, file)
file_list.append(file_path)
# Use thread pool
with ThreadPoolExecutor(max_workers=4) as executor:
for file_path in file_list:
executor.submit(self._compress_file, file_path, zip_file)
executor.shutdown(wait=True)
def main():
parser = argparse.ArgumentParser(description="Compress or extract files")
subparsers = parser.add_subparsers(title="Command", dest="command")
# Subparser for extraction
extract_parser = subparsers.add_parser("extract")
extract_parser.add_argument("source", help="Path to the archive file")
extract_parser.add_argument("destination", help="Extraction directory")
# Subparser for compression
compress_parser = subparsers.add_parser("compress")
compress_parser.add_argument("source", help="Path to the file or folder to compress")
compress_parser.add_argument("archive_name", help="Name for the compressed archive")
compress_parser.add_argument("-f", "--format", choices=["zip"], default="zip", help="Archive format (default: zip)")
args = parser.parse_args()
if args.command == "extract":
extractor = Extractor()
extractor.zipviewer(args.source, args.destination)
if args.command == "compress":
compressor = Compressor()
compressor.compress(args.source, args.archive_name, args.format)
else:
print("Invalid command. Use 'extract' or 'compress'")
if __name__ == "__main__":
main()