This Python tool analyzes and compares file signatures to detect similarities between files despite obfuscation techniques. It features advanced n-gram analysis, entropy calculation, byte pattern detection, and specialized PE file analysis capabilities.
Follow these steps to set up and use the File Signature Analyzer:
pip install numpy tqdm
file_signature_analyzer.py
.python file_signature_analyzer.py file1.bin file2.bin
python file_signature_analyzer.py file1.bin file2.bin --min-ngram 2 --max-ngram 8 --threshold 0.8 --verbose
--verbose
flag to see detailed similarity metrics.Notes:
These modules provide functionality for file handling, mathematical operations, pattern matching, and data processing.
import os
import re
import math
import zlib
import argparse
import hashlib
import struct
from collections import Counter, defaultdict
import numpy as np
from typing import Dict, List, Tuple, Set, Any, Optional
from tqdm import tqdm
A class that identifies file types based on magic signatures and special patterns.
class FileTypeDetector:
MAGIC_SIGNATURES = {
b'MZ': 'executable',
b'PE\x00\x00': 'pe_header',
b'\x7FELF': 'elf',
b'\x89PNG': 'png',
b'\xFF\xD8\xFF': 'jpeg',
b'PK\x03\x04': 'zip',
b'%PDF': 'pdf',
b'\x50\x4B\x03\x04': 'office',
b'\xD0\xCF\x11\xE0': 'ole',
}
@classmethod
def detect_file_type(cls, data: bytes) -> Tuple[str, Dict[str, Any]]:
file_type = "unknown"
metadata = {}
for signature, detected_type in cls.MAGIC_SIGNATURES.items():
if data.startswith(signature):
file_type = detected_type
break
if file_type == "executable":
if len(data) > 0x3C:
pe_offset = struct.unpack(" pe_offset + 4 and data[pe_offset:pe_offset+4] == b'PE\x00\x00':
file_type = "pe_executable"
metadata = cls._analyze_pe_file(data, pe_offset)
if file_type == "unknown":
if cls._is_likely_text(data):
file_type = "text"
if b'
Methods to identify text files and analyze PE executable structures.
@staticmethod
def _is_likely_text(data: bytes) -> bool:
sample = data[:4096]
printable = sum(32 <= b <= 126 or b in (9, 10, 13) for b in sample)
null_bytes = sample.count(0)
return printable > len(sample) * 0.8 and null_bytes < len(sample) * 0.05
@staticmethod
def _analyze_pe_file(data: bytes, pe_offset: int) -> Dict[str, Any]:
metadata = {"sections": []}
try:
if len(data) < pe_offset + 6 + 2:
return metadata
num_sections = struct.unpack("
The main class that manages file signature extraction and comparison.
class FileSignatureAnalyzer:
def __init__(self, min_ngram: int = 1, max_ngram: int = 12, threshold: float = 0.76):
self.min_ngram = min_ngram
self.max_ngram = max_ngram
self.threshold = threshold
self.file_type_weights = {
"default": {
'entropy': 0.1,
'byte_freq': 0.2,
'avg_ngram': 0.4,
'compression': 0.1,
'patterns': 0.2
},
"pe_executable": {
'entropy': 0.05,
'byte_freq': 0.1,
'avg_ngram': 0.25,
'compression': 0.05,
'section_similarity': 0.25,
'import_similarity': 0.15,
'string_similarity': 0.15
},
"text": {
'entropy': 0.05,
'byte_freq': 0.1,
'avg_ngram': 0.6,
'compression': 0.1,
'patterns': 0.15
}
}
def load_file(self, file_path: str) -> bytes:
try:
with open(file_path, 'rb') as f:
return f.read()
except Exception as e:
print(f"Error reading file {file_path}: {e}")
return b''
Methods to extract various signatures from file content.
def extract_signatures(self, content: bytes) -> Dict[str, Any]:
signatures = {}
file_type, type_metadata = FileTypeDetector.detect_file_type(content)
signatures['file_type'] = file_type
signatures['type_metadata'] = type_metadata
with tqdm(total=6, desc="Extracting signatures") as pbar:
signatures['entropy'] = self._calculate_entropy(content)
pbar.update(1)
signatures['byte_freq'] = self._get_byte_frequency(content)
pbar.update(1)
signatures['ngrams'] = {}
for n in range(self.min_ngram, self.max_ngram + 1):
signatures['ngrams'][n] = self._extract_ngrams(content, n)
pbar.update(1)
signatures['compression_ratio'] = len(zlib.compress(content)) / max(1, len(content))
pbar.update(1)
signatures['byte_patterns'] = self._extract_byte_patterns(content)
pbar.update(1)
if file_type == "pe_executable":
signatures['pe_specific'] = self._extract_pe_specific_features(content, type_metadata)
pbar.update(1)
return signatures
def _calculate_entropy(self, data: bytes) -> float:
if not data:
return 0.0
counter = Counter(data)
total = len(data)
entropy = 0.0
for count in counter.values():
probability = count / total
entropy -= probability * math.log2(probability)
return entropy
def _get_byte_frequency(self, data: bytes) -> Dict[int, float]:
counter = Counter(data)
total = len(data)
return {byte: count/total for byte, count in counter.items()}
Methods for extracting n-grams and detecting repeating byte patterns in the file content.
def _extract_ngrams(self, data: bytes, n: int) -> Counter:
ngrams = Counter()
for i in tqdm(range(len(data) - n + 1), desc=f"Extracting {n}-grams", leave=False):
ngram = data[i:i+n]
ngrams[ngram] += 1
return ngrams
def _extract_byte_patterns(self, data: bytes) -> Dict[str, float]:
patterns = {}
repeats = {}
max_sample_size = 100000000000
sample_step = max(1, len(data) // max_sample_size) if len(data) > max_sample_size else 1
for size in tqdm(range(4, 12), desc="Extracting byte patterns", leave=False):
seen_chunks = set()
repeats[size] = 0
chunk_dict = {}
for i in range(0, len(data) - size, sample_step):
chunk = data[i:i+size]
if chunk in chunk_dict:
chunk_dict[chunk] += 1
else:
chunk_dict[chunk] = 1
repeats[size] = sum(1 for count in chunk_dict.values() if count > 1)
patterns['repeats'] = {size: count / max(1, (len(data) - size + 1) // sample_step)
for size, count in repeats.items()}
return patterns
Methods to extract specific features from PE executable files, including sections, imports, and strings.
def _extract_pe_specific_features(self, data: bytes, metadata: Dict[str, Any]) -> Dict[str, Any]:
pe_features = {}
pe_features['sections'] = {}
for section in metadata.get('sections', []):
pe_features['sections'][section['name']] = {
'size': section['size'],
'characteristics': section['characteristics']
}
pe_features['imports'] = self._extract_pe_imports(data)
pe_features['strings'] = self._extract_strings(data)
return pe_features
def _extract_pe_imports(self, data: bytes) -> List[str]:
imports = []
pattern = re.compile(b'[A-Za-z0-9_]{4,32}')
common_dlls = [b'kernel32', b'user32', b'gdi32', b'advapi32', b'ole32', b'shell32',
b'ntdll', b'comctl32', b'wsock32', b'wininet']
for dll in common_dlls:
dll_pos = 0
while True:
dll_pos = data.find(dll, dll_pos + 1)
if dll_pos == -1:
break
region = data[dll_pos:dll_pos + 200]
for match in pattern.finditer(region):
func_name = match.group(0).decode('utf-8', errors='ignore')
if len(func_name) >= 4 and func_name not in imports:
imports.append(func_name)
return imports
def _extract_strings(self, data: bytes) -> List[str]:
strings = []
ascii_pattern = re.compile(b'[ -~]{4,}')
for match in ascii_pattern.finditer(data):
string = match.group(0).decode('ascii', errors='ignore')
if len(string) >= 4:
strings.append(string)
for i in range(0, len(data) - 8, 2):
if all(data[j] in range(32, 127) and data[j+1] == 0 for j in range(i, i+8, 2)):
end = i
while end < len(data) - 1 and data[end] in range(32, 127) and data[end+1] == 0:
end += 2
if end - i >= 8:
try:
string = data[i:end:2].decode('ascii', errors='ignore')
strings.append(string)
except:
pass
return list(set(strings))
Methods to compare the extracted signatures, calculate similarity scores, and determine if files are similar.
def compare_signatures(self, sig1: Dict, sig2: Dict) -> Dict[str, float]:
similarities = {}
file_type1 = sig1.get('file_type', 'unknown')
file_type2 = sig2.get('file_type', 'unknown')
print(f"File 1 type: {file_type1}")
print(f"File 2 type: {file_type2}")
file_type_match = file_type1 == file_type2
file_type_penalty = 0.0 if file_type_match else 0.3
with tqdm(total=6, desc="Comparing signatures") as pbar:
entropy_diff = abs(sig1['entropy'] - sig2['entropy'])
similarities['entropy'] = 1.0 - min(1.0, entropy_diff / 8.0)
pbar.update(1)
similarities['byte_freq'] = self._compare_distributions(sig1['byte_freq'], sig2['byte_freq'])
pbar.update(1)
similarities['ngrams'] = {}
for n in range(self.min_ngram, self.max_ngram + 1):
similarities['ngrams'][n] = self._compare_ngram_distributions(
sig1['ngrams'][n], sig2['ngrams'][n])
similarities['avg_ngram'] = sum(similarities['ngrams'].values()) / len(similarities['ngrams'])
pbar.update(1)
comp_diff = abs(sig1['compression_ratio'] - sig2['compression_ratio'])
similarities['compression'] = 1.0 - min(1.0, comp_diff)
pbar.update(1)
pattern_sim = self._compare_nested_dicts(sig1['byte_patterns'], sig2['byte_patterns'])
similarities['patterns'] = pattern_sim
pbar.update(1)
if file_type1 == 'pe_executable' and file_type2 == 'pe_executable':
pe_similarities = self._compare_pe_features(
sig1.get('pe_specific', {}),
sig2.get('pe_specific', {})
)
similarities.update(pe_similarities)
pbar.update(1)
if file_type1 == 'pe_executable' and file_type2 == 'pe_executable':
weights = self.file_type_weights.get('pe_executable', self.file_type_weights['default'])
elif file_type1 == 'text' and file_type2 == 'text':
weights = self.file_type_weights.get('text', self.file_type_weights['default'])
else:
weights = self.file_type_weights['default']
weighted_scores = []
for metric, weight in weights.items():
if metric in similarities:
weighted_scores.append(similarities['metric'] * weight)
if weighted_scores:
similarities['overall'] = sum(weighted_scores)
if not file_type_match:
similarities['overall'] = max(0.0, similarities['overall'] - file_type_penalty)
print(f"Applied file type mismatch penalty: {file_type_penalty}")
else:
similarities['overall'] = 0.0
return similarities
def _compare_pe_features(self, pe1: Dict, pe2: Dict) -> Dict[str, float]:
pe_similarities = {}
sections1 = set(pe1.get('sections', {}).keys())
sections2 = set(pe2.get('sections', {}).keys())
if sections1 and sections2:
common_sections = sections1.intersection(sections2)
all_sections = sections1.union(sections2)
pe_similarities['section_similarity'] = len(common_sections) / max(1, len(all_sections))
size_similarities = []
for section in common_sections:
size1 = pe1['sections'][section].get('size', 0)
size2 = pe2['sections'][section].get('size', 0)
if size1 > 0 and size2 > 0:
ratio = min(size1, size2) / max(size1, size2)
size_similarities.append(ratio)
if size_similarities:
pe_similarities['section_size_similarity'] = sum(size_similarities) / len(size_similarities)
else:
pe_similarities['section_similarity'] = 0.0
imports1 = set(pe1.get('imports', []))
imports2 = set(pe2.get('imports', []))
if imports1 and imports2:
common_imports = imports1.intersection(imports2)
all_imports = imports1.union(imports2)
pe_similarities['import_similarity'] = len(common_imports) / max(1, len(all_imports))
else:
pe_similarities['import_similarity'] = 0.0
strings1 = set(pe1.get('strings', []))
strings2 = set(pe2.get('strings', []))
if strings1 and strings2:
top_strings1 = set(sorted(strings1, key=len, reverse=True)[:100])
top_strings2 = set(sorted(strings2, key=len, reverse=True)[:100])
common_strings = top_strings1.intersection(top_strings2)
all_strings = top_strings1.union(top_strings2)
pe_similarities['string_similarity'] = len(common_strings) / max(1, len(all_strings))
else:
pe_similarities['string_similarity'] = 0.0
return pe_similarities
def _compare_distributions(self, dist1: Dict, dist2: Dict) -> float:
keys = set(dist1.keys()) | set(dist2.keys())
vec1 = np.array([dist1.get(k, 0.0) for k in keys])
vec2 = np.array([dist2.get(k, 0.0) for k in keys])
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
vec1 = vec1 / norm1
vec2 = vec2 / norm2
return float(np.dot(vec1, vec2))
def _compare_ngram_distributions(self, ngrams1: Counter, ngrams2: Counter) -> float:
top_count = min(500, max(len(ngrams1), len(ngrams2)))
top1 = set(gram for gram, _ in ngrams1.most_common(top_count))
top2 = set(gram for gram, _ in ngrams2.most_common(top_count))
if not top1 and not top2:
return 1.0
intersection = len(top1.intersection(top2))
union = len(top1.union(top2))
return intersection / max(1, union)
def _compare_nested_dicts(self, dict1: Dict, dict2: Dict) -> float:
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
if isinstance(dict1, (int, float)) and isinstance(dict2, (int, float)):
return 1.0 - min(1.0, abs(dict1 - dict2))
return 1.0 if dict1 == dict2 else 0.0
keys = set(dict1.keys()) | set(dict2.keys())
if not keys:
return 1.0
similarities = []
for key in keys:
if key in dict1 and key in dict2:
sim = self._compare_nested_dicts(dict1[key], dict2[key])
similarities.append(sim)
else:
similarities.append(0.0)
return sum(similarities) / len(similarities) if similarities else 0.0
def are_files_similar(self, similarities: Dict[str, float]) -> Tuple[bool, float]:
overall = similarities['overall']
is_similar = overall >= self.threshold
if is_similar:
confidence = (overall - self.threshold) / (1.0 - self.threshold)
else:
confidence = (self.threshold - overall) / self.threshold
confidence = 0.5 + (confidence * 0.5)
return is_similar, confidence
The entry point of the script, handling command-line arguments and orchestrating the analysis process.
def main():
import time
from datetime import datetime
start_time = time.time()
start_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Analysis started at: {start_datetime}")
parser = argparse.ArgumentParser(description='Compare file signatures to detect similar files despite obfuscation.')
parser.add_argument('file1', help='Path to first file')
parser.add_argument('file2', help='Path to second file')
parser.add_argument('--min-ngram', type=int, default=1, help='Minimum n-gram size')
parser.add_argument('--max-ngram', type=int, default=12, help='Maximum n-gram size')
parser.add_argument('--threshold', type=float, default=0.76, help='Similarity threshold (0.0-1.0)')
parser.add_argument('--verbose', '-v', action='store_true', help='Show detailed similarity metrics')
args = parser.parse_args()
if not os.path.isfile(args.file1):
print(f"Error: File '{args.file1}' does not exist")
return 1
if not os.path.isfile(args.file2):
print(f"Error: File '{args.file2}' does not exist")
return 1
analyzer = FileSignatureAnalyzer(
min_ngram=args.min_ngram,
max_ngram=args.max_ngram,
threshold=args.threshold
)
print(f"Analyzing {args.file1}...")
content1 = analyzer.load_file(args.file1)
sig1 = analyzer.extract_signatures(content1)
print(f"Analyzing {args.file2}...")
content2 = analyzer.load_file(args.file2)
sig2 = analyzer.extract_signatures(content2)
print("Comparing signatures...")
similarities = analyzer.compare_signatures(sig1, sig2)
is_similar, confidence = analyzer.are_files_similar(similarities)
result = "MATCH" if is_similar else "DIFFERENT"
print(f"\nResult: {result} (confidence: {confidence:.2f})")
print(f"Overall similarity score: {similarities['overall']:.4f}")
if args.verbose:
print("\nDetailed metrics:")
for metric, score in similarities.items():
if isinstance(score, dict):
print(f" {metric}:")
for submetric, subscore in score.items():
print(f" {submetric}: {subscore:.4f}")
else:
print(f" {metric}: {score:.4f}")
end_time = time.time()
end_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
duration = end_time - start_time
print(f"\nAnalysis completed at: {end_datetime}")
print(f"Total duration: {duration:.2f} seconds ({duration/60:.2f} minutes)")
return 0
if __name__ == "__main__":
exit(main())