# imports import os import zipfile import rarfile import py7zr import shutil import tarfile import argparse import tqdm from concurrent.futures import ThreadPoolExecutor from multiprocessing import pool # File Compressor class Compressor: def __init__(self): pass def _compress_folder(self, source_path, archive_file, archive_format): for root, _, files in os.walk(source_path): for file in files: file_path = os.path.join(root, file) archive_path = os.path.relpath(file_path, source_path) self._compress_file(file_path, archive_file, archive_path, archive_format) def _compress_file(self, file_path, archive_file, archive_path, archive_format): if archive_format == "zip": with open(file_path, 'rb') as file: for chunk in iter(lambda: file.read(1024 * 1024), b''): archive_file.writestr(archive_path, chunk) elif archive_format == "tar": archive_file.add(file_path, arcname=archive_path) elif archive_format == "7z": archive_file.write(file_path, archive_path) else: raise ValueError(f"Unsupported archive format: {archive_format}") def compress(self, source_path, archive_name, archive_format="zip"): pbar = tqdm.tqdm(total=100, unit="B", unit_scale=True, desc="Compressing") supported_formats = ["zip", "tar", "7z"] if archive_format not in supported_formats: raise ValueError(f"Unsupported archive format: {archive_format}") archive_path = os.path.join(os.path.dirname(source_path), f"{archive_name}.{archive_format}") # Check if source path exists if not os.path.exists(source_path): print(f"Source path does not exist: {source_path}") return # Check if archive path already exists if os.path.exists(archive_path): print(f"Archive path already exists: {archive_path}") return # Open archive file based on format if archive_format == "zip": archive_file = zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) elif archive_format == "tar": archive_file = tarfile.open(archive_path, mode="w") elif archive_format == "7z": archive_file = py7zr.SevenZipFile(archive_path, mode="w") # Compress the source path try: if os.path.isdir(source_path): self._compress_folder(source_path, archive_file, archive_format) pbar.update(1) else: if os.path.isfile(source_path): self._compress_file(source_path, archive_file, "", archive_format) pbar.update(1) else: print(f"Source path is not a file or directory: {source_path}") return except Exception as e: print(f"Compressed to: {archive_path} error:{e}") finally: archive_file.close() # Ensure closing the archive file if __name__ == "__main__": parser = argparse.ArgumentParser(description="Compress files") parser.add_argument("source", help="Path to the file or folder to compress") parser.add_argument("archive_name", help="Name for the compressed archive") parser.add_argument("-f", "--format", choices=["zip", "tar", "7z"], default="zip", help="Archive format") args = parser.parse_args() compressor = Compressor() compressor.compress(args.source, args.archive_name, args.format)