Source code for streamcorpus_pipeline._dedup

'''
Simple dedup tools that rejects StreamItems with a previously seen
doc_id or have similar nilsimsa hashes.

This software is released under an MIT/X11 open source license.

Copyright 2012-2014 Diffeo, Inc.
'''
from __future__ import absolute_import
import logging
import os
import sys

import nilsimsa

from streamcorpus_pipeline.stages import Configured

logger = logging.getLogger(__name__)

[docs]class dedup(Configured): config_name = 'dedup' def __init__(self, *args, **kwargs): super(dedup, self).__init__(*args, **kwargs) ## keep a mapping from doc_id to nilsimsa hexdigests self._doc_ids = dict() self._count = 0 def __call__(self, si, context): self._count += 1 nil = None ## get the desired content from the text content = getattr(si.body, self.config['content_form'], '') if content is None: content = '' len_raw = len( getattr(si.body, 'raw', '') ) ## we will always reject if the doc_id is the same and the ## sim and len requirements pass if si.doc_id in self._doc_ids: stream_id2, abs_url2, nil2, content2, len_raw2 = self._doc_ids[si.doc_id] if not (content and content2): ## fall back to using raw if max( len_raw, len_raw2 ) == 0: ## reject is the length of everything is zero return None len_sim_raw = 1 - float( abs( len_raw - len_raw2 ) ) / max( len_raw, len_raw2 ) if 1000 * len_sim_raw >= self.config['min_len_sim_thousandths_raw']: logger.info('rejecting same doc %s, no si.body.%s, len_raw_sim_frac=%d >= %d=min_len_sim_thousandths_raw, lang=%r' % ( si.stream_id, self.config['content_form'], 1000 * len_sim_raw, self.config['min_len_sim_thousandths_raw'], si.body.language and si.body.language.code or None)) return None else: logger.info('keeping doc %s (same page as %s), no si.body.%s, len_raw_sim_frac=%d > %d=min_len_sim_thousandths_raw' % ( si.stream_id, stream_id2, self.config['content_form'], 1000 * len_sim_raw, self.config['min_len_sim_thousandths_raw'])) elif not self.config['use_nilsimsa']: len_sim = 1 - float( abs(len(content) - len(content2)) ) / max(len(content), len(content2)) if 1000 * len_sim >= self.config['min_len_sim_thousandths_clean']: logger.info( 'rejecting same doc_id sim=not-computed, len=(%d +/- %d) %s %s %r %r' \ % (len(content), len_sim, si.stream_id, stream_id2, si.abs_url, abs_url2)) ## reject it! return None else: ## get the data from the previously seen item with same doc_id stream_id2, abs_url2, nil2, content2, len_raw2 = self._doc_ids[si.doc_id] ## compute and compare the nilsimsa hashes nil = nilsimsa.Nilsimsa(content).hexdigest() sim = nilsimsa.compare_digests( nil, nil2 ) if self.config['exactness_nilsimsa_threshold'] <= sim: if self.config['min_clean_length'] <= len(content): len_sim = 1 - float( abs(len(content) - len(content2)) ) / max(len(content), len(content2)) if 1000 * len_sim >= self.config['min_len_sim_thousandths_clean']: logger.info( 'rejecting same doc_id sim=%d len=(%d +/- %d) %s %r' \ % (sim, len(content), len_sim, si.doc_id, si.abs_url)) ## reject it! return None if content and not self.config['require_same_doc_id'] and self.config['use_nilsimsa']: if not nil: nil = nilsimsa.Nilsimsa(content).hexdigest() for doc_id, (stream_id2, abs_url2, nil2, content2, len_raw_2) in self._doc_ids.items(): sim = nilsimsa.compare_digests( nil, nil2) if sim >= self.config['log_nilsimsa_threshold']: if 'log_dir_path' in self.config: ## write to disk if not os.path.exists(self.config['log_dir_path']): os.makedirs(self.config['log_dir_path']) first = os.path.join(self.config['log_dir_path'], '%d-%s-%s.html' % (sim, si.doc_id, doc_id)) fh = open(first, 'wb') fh.write(content) fh.write('\n\n---NEW DOC --\n\n') fh.write(content2) fh.close() if sim >= self.config['exactness_nilsimsa_threshold']: len_sim = abs(len(content) - len(content2)) logger.info( 'rejecting sim=%d len=(%d +/- %d) %s %s %r %r' \ % (sim, len(content), len_sim, doc_id, si.doc_id, abs_url2, si.abs_url) ) return None else: logger.info( 'observed sim=%d %s %s %r %r' % (sim, doc_id, si.doc_id, abs_url2, si.abs_url) ) ## not rejecting, so keep data for further comparisons within this pipeline run if self.config['use_nilsimsa']: if not nil: nil = nilsimsa.Nilsimsa(content).hexdigest() else: nil = None logger.debug('dedup caching %s len(content) = %s, language=%r' % ( si.stream_id, len(content), si.body.language and si.body.language.code or None)) self._doc_ids[ si.doc_id ] = (si.stream_id, si.abs_url, nil, content, len_raw) return si