#!/usr/bin/env python3 # $Id: de_dupe.py,v 1.52 2025/06/27 18:50:22 jdeifik Exp $ # de_dupe.py - deduplicates a filesystem, based on duplicates files # Copyright Jeff turbo Deifik 2010. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Oct-09-2024 JTD Added cmp_files which calls cmp(1) for big files # This runs at native performance, not python performance import os import sys import stat import argparse import filecmp import subprocess # for cmp_files from typing import * import jefflib from collections.abc import Callable from jefflib import Progress_Indicator T = TypeVar("T") desc: List[str] = [ 'Name: de_dupe - takes in one or more directory trees', 'directories. Looks for files in the directory(s) that are duplicates', 'Usaage: de_dupe.py [--detail=#] [--minsize=#] [--maxsize=#]', ' [--big or --small] ] directories...', '', 'example: de_dupe.py --detail=# --minsize=1000000 foo-dir ...', '', '--detail=# Specify the level of detail of output', '0=no output, 2=output every 100, 4=bytes saved per file, 6=verbose', '--minsize=# Specify the minimum file size to consider', '--maxsize=# Specify the maximum file size to consider', '-b, --big process files from big to small', '-s --small process files from small to big', '-u, --usage Print usage and exit' ] # global constants filecmp.BUFSIZE = 64 * 1024 # Bigger buffer size for cmp log_interval = 100 # log output every 100 files log_size = 100_000_000_000 # log output every 100 gig CMP_SIZE = 10_000_000 # Call cmp not python filecmp # used by Uniq function def tuple_cmp(a : tuple, b : tuple) -> int: if a[1] > b[1]: return 1 elif a[1] < b[1]: return -1 else: return 0 # Looks through source, trying to find a match inside of it. # If found, temporarily rename file, make a hard link, then delete renamed file. def look_through_one_table(source : dict[Tuple[str, int]], pro_ind : Progress_Indicator) -> None: bytes_saved = 0 # Used for logging progress print_size = 0 # Print progress via file size being processed l_size = 0 l_interval = 0 pro_ind.Reset() sys.stdout.write(b'\nLooking for duplicates\n') sys.stdout.flush() if Opts.big: sorted_list = list(sorted(source.keys(), reverse=True)) else: sorted_list = list(sorted(source.keys())) # loop through file sizes that are the same for size in sorted_list: source_files = source[size] # List of all files with same size # Sort by inode s_source_files = sorted(source_files, key=lambda foo: foo[1]) # Uniqify based on inode u_source_files = jefflib.Uniq(s_source_files, lcmp=tuple_cmp) if len(u_source_files) < 2: # Can't match if less than 2 files continue if the_logging(source, size, bytes_saved, print_size, \ l_interval > log_interval, l_size >= log_size): print_size = size l_interval = 0 l_size = 0 # Compute md5sum on first block of each file, return is sorted by md5sum m_source_files = md5sum_s(u_source_files) leng = len(m_source_files) for i in range(leng-1): # Check all files of the same size f = m_source_files[i] # filecmp.cmp uses a cache, which can run out of memory, # therefore I am clearing it periodically filecmp.clear_cache() # Against all other files of the same size, after index i for ff in m_source_files[(i+1):]: # If the md5sum's don't match, we are done (as they are sorted) if f[1] != ff[1]: break res = core(size, bytes_saved, f[0], ff[0], pro_ind) if res == 2: bytes_saved += size l_size += size l_interval += 1 del source[size] # Save some memory # possibly log some output from loop_through_one_table def the_logging(source, size, bytes_saved, print_size, do_interval, \ do_log_size) -> bool: if Opts.detail >= 2 and (print_size == 0 or do_interval or do_log_size or \ ((Opts.big == True) and size < print_size / 2) or \ ((Opts.big == False) and size > print_size * 2)): if Opts.big: tot = count_lambda(source, lambda x: x <= size) else: tot = count_lambda(source, lambda x : x >= size) s = 'Size ' + jefflib.Int_With_Comma_Sep(size) + ', ' + \ 'Left ' + jefflib.Int_With_Comma_Sep(tot) + ', ' + \ 'Bytes ' + jefflib.Int_With_Comma_Sep(bytes_saved) + '\n' sys.stdout.write(jefflib.String_to_Bytes(s)) sys.stdout.flush() return True return False # the core file comparison and linking code # Return 0 if files don't match. # Return 1 if files match, but file_a and file_b was a hard link # Return 2 if files match, and file_a or file_b was not a hard link (disk space saved) def core(siz : int, byt_sav : int, file_a : str, file_b : str, \ pro_ind : Progress_Indicator) -> int: stat_a = os.stat(file_a) stat_b = os.stat(file_b) links_a = stat_a[stat.ST_NLINK] links_b = stat_b[stat.ST_NLINK] ret = 0 # Verify inodes differ if (stat_a[stat.ST_INO] != stat_b[stat.ST_INO]): pro_ind.P_I() # Call cmp, as it is faster for big files res = cmp_files(file_a, file_b) if siz > CMP_SIZE \ else filecmp.cmp(file_a, file_b, False) if res: # Make link to file having the highest link count if links_b >= links_a: res = file_to_link(file_a, file_b) else: res = file_to_link(file_b, file_a) if res: ret = 1 if Opts.detail >= 6: sys.stdout.write(b'Files match ' + file_a + file_b + b'\n') if links_a == 1 or links_b == 1: # Woo-hoo disk space is saved! ret = 2 if Opts.detail >= 4: pro_ind.Reset() sys.stdout.write(jefflib.String_to_Bytes( \ jefflib.Int_With_Comma_Sep(siz+byt_sav) + '\n')) sys.stdout.flush() return ret # rename link_file to something clever, # make a hard link named link_file to keep_file # delete renamed file def file_to_link(link_file : str, keep_file : str) -> bool: (head, tail) = os.path.split(link_file) # If the file doesn't have a path, then just use the file name, with no path if head: tmp_name = head + b'/de_dupe-' + tail else: tmp_name = b'de_dupe-' + tail ret = False # Rename link_file to tmp_name if jefflib.Try_To_Rename(link_file, tmp_name, verbose=False): # Make the hard link if jefflib.Try_To_Link(keep_file, link_file, verbose=False): # Delete the renamed link_file if jefflib.Try_To_Remove(tmp_name, verbose=False): ret = True return ret # Generated by chatgpt def cmp_files(file1, file2): try: # Execute the cmp command and suppress output result = subprocess.run(['cmp', '-s', file1, file2], check=False) return (result.returncode == 0) except Exception as e: print(f"Error comparing files: {e}") return False # Compute md5sum on each file # Return aray of tuple of file_name, md5sum def md5sum_s(tuples : List[str]) -> List[str]: new_lis = [] for tup in tuples: md = jefflib.Md5sum_On_First_Block_Of_File(tup[0]) new_lis.append((tup[0], md)) # Sort by md5sum return sorted(new_lis, key=lambda foo: foo[1]) # Return count of number of lines in dictionary with truen lambda function # Each dict value is a list of integers def count_lambda(dic: Dict[T, List[int]], lam: Callable[[T],bool]) -> int: # Add number of elements for each value, when lam is true return sum(len(v) for k, v in dic.items() if lam(k)) # Top level code if __name__ == '__main__': # Command line parsing code Parser = argparse.ArgumentParser() Parser.add_argument("--minsize", dest="minsize", action="store", type=int, default=256, help="specify min size") Parser.add_argument("--maxsize", dest="maxsize", action="store", type=int, default=0, help="specify max size") Parser.add_argument("--detail", dest="detail", action="store", type=int, default=2, help="specify detail level") Parser.add_argument("-b", "--big", dest="big", action="store_true", help="big to small", default=True) Parser.add_argument("-s", "--small", dest="big", action="store_false", help="small to big") Parser.add_argument("-u", "--usage", dest="use", action="store_true", default=False, help="Print usage and exit") Parser.add_argument("args", nargs='*', help="the real arguments") Opts = Parser.parse_args() if Opts.use: jefflib.Usage(desc) if len(Opts.args) == 0: print("Need to specify one or more directories", file=sys.stderr) jefflib.Usage(desc) # Make args into binary sttrings dirs = jefflib.List_String_to_Bytes(Opts.args) sys.stdout = sys.stdout.buffer # Put stdout in binary mode # Must create this object after setting sys.stdout to binary progress_ind = Progress_Indicator(1000, 1) try: the_table = jefflib.files_and_sizes(dirs, Opts.minsize, Opts.maxsize, progress_ind.P_I, ignore_links = False, bin = True) look_through_one_table(the_table, progress_ind) except KeyboardInterrupt as e: sys.stdout.write(b'\nKeyboard interrupt occured\n') sys.exit(1)