File Signature Analyzer

This Python tool analyzes and compares file signatures to detect similarities between files despite obfuscation techniques. It features advanced n-gram analysis, entropy calculation, byte pattern detection, and specialized PE file analysis capabilities.

Setup Tutorial

Follow these steps to set up and use the File Signature Analyzer:

  1. Install Required Dependencies:
    • Ensure Python 3.6+ is installed.
    • Install packages:
      pip install numpy tqdm
  2. Download the Code:
    • Save the code as file_signature_analyzer.py.
  3. Running the Tool:
    • Basic usage:
      python file_signature_analyzer.py file1.bin file2.bin
    • Advanced options:
      python file_signature_analyzer.py file1.bin file2.bin --min-ngram 2 --max-ngram 8 --threshold 0.8 --verbose
  4. Interpreting Results:
    • The tool will output "MATCH" or "DIFFERENT" with a confidence score.
    • Use the --verbose flag to see detailed similarity metrics.

Notes:

  • Analysis of large files may take significant time.
  • PE file analysis provides additional insights for executable files.
  • Adjust threshold values based on your specific use case.

1. Importing Modules

These modules provide functionality for file handling, mathematical operations, pattern matching, and data processing.


import os
import re
import math
import zlib
import argparse
import hashlib
import struct
from collections import Counter, defaultdict
import numpy as np
from typing import Dict, List, Tuple, Set, Any, Optional
from tqdm import tqdm
            

2. File Type Detector

A class that identifies file types based on magic signatures and special patterns.


class FileTypeDetector:
    MAGIC_SIGNATURES = {
        b'MZ': 'executable',  
        b'PE\x00\x00': 'pe_header',
        b'\x7FELF': 'elf',
        b'\x89PNG': 'png',
        b'\xFF\xD8\xFF': 'jpeg',
        b'PK\x03\x04': 'zip',
        b'%PDF': 'pdf',
        b'\x50\x4B\x03\x04': 'office',  
        b'\xD0\xCF\x11\xE0': 'ole',  
    }
    @classmethod
    def detect_file_type(cls, data: bytes) -> Tuple[str, Dict[str, Any]]:
        file_type = "unknown"
        metadata = {}
        for signature, detected_type in cls.MAGIC_SIGNATURES.items():
            if data.startswith(signature):
                file_type = detected_type
                break
        if file_type == "executable":
            if len(data) > 0x3C:
                pe_offset = struct.unpack(" pe_offset + 4 and data[pe_offset:pe_offset+4] == b'PE\x00\x00':
                    file_type = "pe_executable"
                    metadata = cls._analyze_pe_file(data, pe_offset)
        if file_type == "unknown":
            if cls._is_likely_text(data):
                file_type = "text"
                if b'

3. Text Detection and PE Analysis

Methods to identify text files and analyze PE executable structures.


    @staticmethod
    def _is_likely_text(data: bytes) -> bool:
        sample = data[:4096]
        printable = sum(32 <= b <= 126 or b in (9, 10, 13) for b in sample)
        null_bytes = sample.count(0)
        return printable > len(sample) * 0.8 and null_bytes < len(sample) * 0.05

    @staticmethod
    def _analyze_pe_file(data: bytes, pe_offset: int) -> Dict[str, Any]:
        metadata = {"sections": []}
        try:
            if len(data) < pe_offset + 6 + 2:
                return metadata
            num_sections = struct.unpack("

4. File Signature Analyzer Class

The main class that manages file signature extraction and comparison.


class FileSignatureAnalyzer:
    def __init__(self, min_ngram: int = 1, max_ngram: int = 12, threshold: float = 0.76):
        self.min_ngram = min_ngram
        self.max_ngram = max_ngram
        self.threshold = threshold
        self.file_type_weights = {
            "default": {
                'entropy': 0.1,
                'byte_freq': 0.2,
                'avg_ngram': 0.4,
                'compression': 0.1,
                'patterns': 0.2
            },
            "pe_executable": {
                'entropy': 0.05,         
                'byte_freq': 0.1,        
                'avg_ngram': 0.25,       
                'compression': 0.05,     
                'section_similarity': 0.25, 
                'import_similarity': 0.15,  
                'string_similarity': 0.15   
            },
            "text": {
                'entropy': 0.05,
                'byte_freq': 0.1,
                'avg_ngram': 0.6,  
                'compression': 0.1,
                'patterns': 0.15
            }
        }

    def load_file(self, file_path: str) -> bytes:
        try:
            with open(file_path, 'rb') as f:
                return f.read()
        except Exception as e:
            print(f"Error reading file {file_path}: {e}")
            return b''
            

5. Signature Extraction

Methods to extract various signatures from file content.


    def extract_signatures(self, content: bytes) -> Dict[str, Any]:
        signatures = {}
        file_type, type_metadata = FileTypeDetector.detect_file_type(content)
        signatures['file_type'] = file_type
        signatures['type_metadata'] = type_metadata
        with tqdm(total=6, desc="Extracting signatures") as pbar:
            signatures['entropy'] = self._calculate_entropy(content)
            pbar.update(1)
            signatures['byte_freq'] = self._get_byte_frequency(content)
            pbar.update(1)
            signatures['ngrams'] = {}
            for n in range(self.min_ngram, self.max_ngram + 1):
                signatures['ngrams'][n] = self._extract_ngrams(content, n)
            pbar.update(1)
            signatures['compression_ratio'] = len(zlib.compress(content)) / max(1, len(content))
            pbar.update(1)
            signatures['byte_patterns'] = self._extract_byte_patterns(content)
            pbar.update(1)
            if file_type == "pe_executable":
                signatures['pe_specific'] = self._extract_pe_specific_features(content, type_metadata)
            pbar.update(1)
        return signatures

    def _calculate_entropy(self, data: bytes) -> float:
        if not data:
            return 0.0
        counter = Counter(data)
        total = len(data)
        entropy = 0.0
        for count in counter.values():
            probability = count / total
            entropy -= probability * math.log2(probability)
        return entropy

    def _get_byte_frequency(self, data: bytes) -> Dict[int, float]:
        counter = Counter(data)
        total = len(data)
        return {byte: count/total for byte, count in counter.items()}
            

6. N-gram and Pattern Analysis

Methods for extracting n-grams and detecting repeating byte patterns in the file content.


    def _extract_ngrams(self, data: bytes, n: int) -> Counter:
        ngrams = Counter()
        for i in tqdm(range(len(data) - n + 1), desc=f"Extracting {n}-grams", leave=False):
            ngram = data[i:i+n]
            ngrams[ngram] += 1
        return ngrams

    def _extract_byte_patterns(self, data: bytes) -> Dict[str, float]:
        patterns = {}
        repeats = {}
        max_sample_size = 100000000000  
        sample_step = max(1, len(data) // max_sample_size) if len(data) > max_sample_size else 1
        for size in tqdm(range(4, 12), desc="Extracting byte patterns", leave=False):
            seen_chunks = set()
            repeats[size] = 0
            chunk_dict = {}
            for i in range(0, len(data) - size, sample_step):
                chunk = data[i:i+size]
                if chunk in chunk_dict:
                    chunk_dict[chunk] += 1
                else:
                    chunk_dict[chunk] = 1
            repeats[size] = sum(1 for count in chunk_dict.values() if count > 1)
        patterns['repeats'] = {size: count / max(1, (len(data) - size + 1) // sample_step) 
                               for size, count in repeats.items()}
        return patterns
            

7. PE Specific Analysis

Methods to extract specific features from PE executable files, including sections, imports, and strings.


    def _extract_pe_specific_features(self, data: bytes, metadata: Dict[str, Any]) -> Dict[str, Any]:
        pe_features = {}
        pe_features['sections'] = {}
        for section in metadata.get('sections', []):
            pe_features['sections'][section['name']] = {
                'size': section['size'],
                'characteristics': section['characteristics']
            }
        pe_features['imports'] = self._extract_pe_imports(data)
        pe_features['strings'] = self._extract_strings(data)
        return pe_features

    def _extract_pe_imports(self, data: bytes) -> List[str]:
        imports = []
        pattern = re.compile(b'[A-Za-z0-9_]{4,32}')
        common_dlls = [b'kernel32', b'user32', b'gdi32', b'advapi32', b'ole32', b'shell32', 
                       b'ntdll', b'comctl32', b'wsock32', b'wininet']
        for dll in common_dlls:
            dll_pos = 0
            while True:
                dll_pos = data.find(dll, dll_pos + 1)
                if dll_pos == -1:
                    break
                region = data[dll_pos:dll_pos + 200]
                for match in pattern.finditer(region):
                    func_name = match.group(0).decode('utf-8', errors='ignore')
                    if len(func_name) >= 4 and func_name not in imports:
                        imports.append(func_name)
        return imports

    def _extract_strings(self, data: bytes) -> List[str]:
        strings = []
        ascii_pattern = re.compile(b'[ -~]{4,}')
        for match in ascii_pattern.finditer(data):
            string = match.group(0).decode('ascii', errors='ignore')
            if len(string) >= 4:
                strings.append(string)
        for i in range(0, len(data) - 8, 2):
            if all(data[j] in range(32, 127) and data[j+1] == 0 for j in range(i, i+8, 2)):
                end = i
                while end < len(data) - 1 and data[end] in range(32, 127) and data[end+1] == 0:
                    end += 2
                if end - i >= 8:  
                    try:
                        string = data[i:end:2].decode('ascii', errors='ignore')
                        strings.append(string)
                    except:
                        pass
        return list(set(strings))  
            

8. Signature Comparison Methods

Methods to compare the extracted signatures, calculate similarity scores, and determine if files are similar.


    def compare_signatures(self, sig1: Dict, sig2: Dict) -> Dict[str, float]:
        similarities = {}
        file_type1 = sig1.get('file_type', 'unknown')
        file_type2 = sig2.get('file_type', 'unknown')
        print(f"File 1 type: {file_type1}")
        print(f"File 2 type: {file_type2}")
        file_type_match = file_type1 == file_type2
        file_type_penalty = 0.0 if file_type_match else 0.3
        with tqdm(total=6, desc="Comparing signatures") as pbar:
            entropy_diff = abs(sig1['entropy'] - sig2['entropy'])
            similarities['entropy'] = 1.0 - min(1.0, entropy_diff / 8.0)
            pbar.update(1)
            similarities['byte_freq'] = self._compare_distributions(sig1['byte_freq'], sig2['byte_freq'])
            pbar.update(1)
            similarities['ngrams'] = {}
            for n in range(self.min_ngram, self.max_ngram + 1):
                similarities['ngrams'][n] = self._compare_ngram_distributions(
                    sig1['ngrams'][n], sig2['ngrams'][n])
            similarities['avg_ngram'] = sum(similarities['ngrams'].values()) / len(similarities['ngrams'])
            pbar.update(1)
            comp_diff = abs(sig1['compression_ratio'] - sig2['compression_ratio'])
            similarities['compression'] = 1.0 - min(1.0, comp_diff)
            pbar.update(1)
            pattern_sim = self._compare_nested_dicts(sig1['byte_patterns'], sig2['byte_patterns'])
            similarities['patterns'] = pattern_sim
            pbar.update(1)
            if file_type1 == 'pe_executable' and file_type2 == 'pe_executable':
                pe_similarities = self._compare_pe_features(
                    sig1.get('pe_specific', {}), 
                    sig2.get('pe_specific', {})
                )
                similarities.update(pe_similarities)
            pbar.update(1)
        if file_type1 == 'pe_executable' and file_type2 == 'pe_executable':
            weights = self.file_type_weights.get('pe_executable', self.file_type_weights['default'])
        elif file_type1 == 'text' and file_type2 == 'text':
            weights = self.file_type_weights.get('text', self.file_type_weights['default'])
        else:
            weights = self.file_type_weights['default']
        weighted_scores = []
        for metric, weight in weights.items():
            if metric in similarities:
                weighted_scores.append(similarities['metric'] * weight)
        if weighted_scores:
            similarities['overall'] = sum(weighted_scores)
            if not file_type_match:
                similarities['overall'] = max(0.0, similarities['overall'] - file_type_penalty)
                print(f"Applied file type mismatch penalty: {file_type_penalty}")
        else:
            similarities['overall'] = 0.0
        return similarities

    def _compare_pe_features(self, pe1: Dict, pe2: Dict) -> Dict[str, float]:
        pe_similarities = {}
        sections1 = set(pe1.get('sections', {}).keys())
        sections2 = set(pe2.get('sections', {}).keys())
        if sections1 and sections2:
            common_sections = sections1.intersection(sections2)
            all_sections = sections1.union(sections2)
            pe_similarities['section_similarity'] = len(common_sections) / max(1, len(all_sections))
            size_similarities = []
            for section in common_sections:
                size1 = pe1['sections'][section].get('size', 0)
                size2 = pe2['sections'][section].get('size', 0)
                if size1 > 0 and size2 > 0:
                    ratio = min(size1, size2) / max(size1, size2)
                    size_similarities.append(ratio)
            if size_similarities:
                pe_similarities['section_size_similarity'] = sum(size_similarities) / len(size_similarities)
        else:
            pe_similarities['section_similarity'] = 0.0
        imports1 = set(pe1.get('imports', []))
        imports2 = set(pe2.get('imports', []))
        if imports1 and imports2:
            common_imports = imports1.intersection(imports2)
            all_imports = imports1.union(imports2)
            pe_similarities['import_similarity'] = len(common_imports) / max(1, len(all_imports))
        else:
            pe_similarities['import_similarity'] = 0.0
        strings1 = set(pe1.get('strings', []))
        strings2 = set(pe2.get('strings', []))
        if strings1 and strings2:
            top_strings1 = set(sorted(strings1, key=len, reverse=True)[:100])
            top_strings2 = set(sorted(strings2, key=len, reverse=True)[:100])
            common_strings = top_strings1.intersection(top_strings2)
            all_strings = top_strings1.union(top_strings2)
            pe_similarities['string_similarity'] = len(common_strings) / max(1, len(all_strings))
        else:
            pe_similarities['string_similarity'] = 0.0
        return pe_similarities

    def _compare_distributions(self, dist1: Dict, dist2: Dict) -> float:
        keys = set(dist1.keys()) | set(dist2.keys())
        vec1 = np.array([dist1.get(k, 0.0) for k in keys])
        vec2 = np.array([dist2.get(k, 0.0) for k in keys])
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        if norm1 == 0 or norm2 == 0:
            return 0.0
        vec1 = vec1 / norm1
        vec2 = vec2 / norm2
        return float(np.dot(vec1, vec2))

    def _compare_ngram_distributions(self, ngrams1: Counter, ngrams2: Counter) -> float:
        top_count = min(500, max(len(ngrams1), len(ngrams2)))
        top1 = set(gram for gram, _ in ngrams1.most_common(top_count))
        top2 = set(gram for gram, _ in ngrams2.most_common(top_count))
        if not top1 and not top2:
            return 1.0
        intersection = len(top1.intersection(top2))
        union = len(top1.union(top2))
        return intersection / max(1, union)

    def _compare_nested_dicts(self, dict1: Dict, dict2: Dict) -> float:
        if not isinstance(dict1, dict) or not isinstance(dict2, dict):
            if isinstance(dict1, (int, float)) and isinstance(dict2, (int, float)):
                return 1.0 - min(1.0, abs(dict1 - dict2))
            return 1.0 if dict1 == dict2 else 0.0
        keys = set(dict1.keys()) | set(dict2.keys())
        if not keys:
            return 1.0
        similarities = []
        for key in keys:
            if key in dict1 and key in dict2:
                sim = self._compare_nested_dicts(dict1[key], dict2[key])
                similarities.append(sim)
            else:
                similarities.append(0.0)
        return sum(similarities) / len(similarities) if similarities else 0.0

    def are_files_similar(self, similarities: Dict[str, float]) -> Tuple[bool, float]:
        overall = similarities['overall']
        is_similar = overall >= self.threshold
        if is_similar:
            confidence = (overall - self.threshold) / (1.0 - self.threshold)
        else:
            confidence = (self.threshold - overall) / self.threshold
        confidence = 0.5 + (confidence * 0.5)
        return is_similar, confidence
            

9. Main Function

The entry point of the script, handling command-line arguments and orchestrating the analysis process.


def main():
    import time
    from datetime import datetime
    start_time = time.time()
    start_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"Analysis started at: {start_datetime}")
    parser = argparse.ArgumentParser(description='Compare file signatures to detect similar files despite obfuscation.')
    parser.add_argument('file1', help='Path to first file')
    parser.add_argument('file2', help='Path to second file')
    parser.add_argument('--min-ngram', type=int, default=1, help='Minimum n-gram size')
    parser.add_argument('--max-ngram', type=int, default=12, help='Maximum n-gram size')
    parser.add_argument('--threshold', type=float, default=0.76, help='Similarity threshold (0.0-1.0)')
    parser.add_argument('--verbose', '-v', action='store_true', help='Show detailed similarity metrics')
    args = parser.parse_args()
    if not os.path.isfile(args.file1):
        print(f"Error: File '{args.file1}' does not exist")
        return 1
    if not os.path.isfile(args.file2):
        print(f"Error: File '{args.file2}' does not exist")
        return 1
    analyzer = FileSignatureAnalyzer(
        min_ngram=args.min_ngram,
        max_ngram=args.max_ngram,
        threshold=args.threshold
    )
    print(f"Analyzing {args.file1}...")
    content1 = analyzer.load_file(args.file1)
    sig1 = analyzer.extract_signatures(content1)
    print(f"Analyzing {args.file2}...")
    content2 = analyzer.load_file(args.file2)
    sig2 = analyzer.extract_signatures(content2)
    print("Comparing signatures...")
    similarities = analyzer.compare_signatures(sig1, sig2)
    is_similar, confidence = analyzer.are_files_similar(similarities)
    result = "MATCH" if is_similar else "DIFFERENT"
    print(f"\nResult: {result} (confidence: {confidence:.2f})")
    print(f"Overall similarity score: {similarities['overall']:.4f}")
    if args.verbose:
        print("\nDetailed metrics:")
        for metric, score in similarities.items():
            if isinstance(score, dict):
                print(f"  {metric}:")
                for submetric, subscore in score.items():
                    print(f"    {submetric}: {subscore:.4f}")
            else:
                print(f"  {metric}: {score:.4f}")
    end_time = time.time()
    end_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    duration = end_time - start_time
    print(f"\nAnalysis completed at: {end_datetime}")
    print(f"Total duration: {duration:.2f} seconds ({duration/60:.2f} minutes)")
    return 0

if __name__ == "__main__":
    exit(main())