Source code for streamcorpus_pipeline._serif

#!/usr/bin/env python
'''streamcorpus_pipeline.BatchTransform for Serif

.. This software is released under an MIT/X11 open source license.
   Copyright 2012-2015 Diffeo, Inc.

'''
from __future__ import absolute_import
import gc
import logging
import os
import shutil
import subprocess
import time
import traceback
import uuid

from yakonfig import ConfigurationError

from streamcorpus_pipeline.stages import BatchTransform
from streamcorpus_pipeline._taggers import make_memory_info_msg, align_labels
from streamcorpus_pipeline._exceptions import PipelineOutOfMemory, \
    PipelineBaseException

logger = logging.getLogger(__name__)


[docs]class serif(BatchTransform): '''Batch transform to parse and tag documents with Serif. Serif must be installed externally, somewhere underneath the directory configured as `third_dir_path` in the top-level pipeline configuration. A typical Serif pipeline configuration looks like: .. code-block:: yaml streamcorpus_pipeline: tmp_dir_path: tmp third_dir_path: /opt/diffeo/third reader: ... incremental_transforms: - language - guess_media_type - clean_html - clean_visible batch_transforms: [serif] writers: [...] serif: path_in_third: serif/serif-latest par: streamcorpus_one_step align_labels_by: names_in_chains aligner_data: chain_selector: ALL annotator_id: annotator Configuration options include: `path_in_third` (required) Relative path in `third_dir_path` to directory containing serif data directories `serif_exe` (default: bin/x86_64/Serif) Relative path within `path_in_third` to the Serif executable file `par` (default: streamcorpus_one_step) Serif policy configuration; this is typically ``streamcorpus_one_step``, but may also be ``streamcorpus_read_serifxml`` or ``streamcorpus_generate_serifxml`` `par_additions` Map from poicy configuration name to list of strings to append as additional lines to customize the policy. `cleanup_tmp_files` (default: true) Delete the intermediate files used by Serif The two "align" options control how ratings on the document are associated with tokens generated by Serif. ''' config_name = 'serif' tagger_id = 'serif' default_config = { 'path_in_third': 'serif/serif-latest', 'par': 'streamcorpus_one_step', 'par_additions': {}, 'cleanup_tmp_files': True, 'serif_exe': 'bin/x86_64/Serif', } @staticmethod def check_config(config, name): # We expect the pipeline to force in tmp_dir_path and # third_dir_path; but these are required for k in ['path_in_third', 'serif_exe', 'par', 'cleanup_tmp_files']: if k not in config: raise ConfigurationError('{0} requires {1} setting' .format(name, k)) # These two serif parameter files came from BBN via email, # the full docs are in ../../../docs/serif*txt serif_global_config = ''' ####################################################################### ## Serif Global Configuration ####################################################################### ## ## This file tells Serif where it can find trained model files and the ## scripts that are used for scoring. It is imported by each of the ## other parameter files (such as all.best-english.par). It is not ## intended to be used directly. ## ## This file needs to be updated if Serif is moved or copied to a new ## location. No other files should need to be changed. ## ####################################################################### serif_home: .. serif_data: %serif_home%/data serif_score: %serif_home%/scoring server_docs_root: %serif_home%/doc/http_server use_feature_module_BasicCipherStream: true cipher_stream_always_decrypt: true use_feature_module_KbaStreamCorpus: true use_feature_module_HTMLDocumentReader: true html_character_entities: %serif_data%/unspec/misc/html-character-entities.listmap log_threshold: ERROR #document_split_strategy: region #document_splitter_max_size: 20000 ''' streamcorpus_one_step = ''' ### WARNING: this was written by the streamcorpus_pipeline, and will be ### overwritten the next time the streamcorpus_pipeline runs. # Usage: Serif streamcorpus_one_step.par CHUNK_FILES -o OUT_DIR # # This parameter file is used for running SERIF from start-to-end on # streamcorpus chunk files. # # Using this parameter file, SERIF will read each of the specified # chunk files, and will generate corresponding chunk files in the # directory OUT_DIR/output. Stream items whose language is English # (or unspecified) will be augmented with SERIF annotations. The # input text for SERIF will be read from clean_html (when non-empty) # or from clean_visible (otherwise). INCLUDE {serif_home_par}/config.par INCLUDE {serif_home_par}/master.english.par INCLUDE {serif_home_par}/master.english-speed.par OVERRIDE use_feature_module_KbaStreamCorpus: true OVERRIDE use_stream_corpus_driver: true OVERRIDE start_stage: START OVERRIDE end_stage: output OVERRIDE kba_write_results_to_chunk: true OVERRIDE parser_skip_unimportant_sentences: false OVERRIDE input_type: rawtext OVERRIDE source_format: sgm OVERRIDE document_split_strategy: region OVERRIDE document_splitter_max_size: 20000 #OVERRIDE log_force: kba-stream-corpus,profiling #OVERRIDE kba_skip_docs_with_empty_language_code: false ''' streamcorpus_read_serifxml = ''' ### WARNING: this was written by the streamcorpus_pipeline, and will be ### overwritten the next time the streamcorpus_pipeline runs. # Usage: Serif streamcorpus_read_serifxml.par CHUNK_FILES -o OUT_DIR # # This parameter file is used in cases where SERIF has already been # run on each stream item, and the SerifXML is stored in the tagging. # This version does not need to load any SERIF models, since it # expects that all serif processing has already been done. # # Using this parameter file, SERIF will read each of the specified # chunk files, and will generate corresponding chunk files in the # directory OUT_DIR/output. Stream items whose language is English # (or unspecified) are expected to contain a value for the field # StreamItem.body.taggings['serif'].raw_tagging, containing fully # processed SerifXML output for that item. This SerifXML output will # be read, and then corresponding sentence and relation annotations # will be added to the output files. INCLUDE {serif_home_par}/config.par INCLUDE {serif_home_par}/master.english.par INCLUDE {serif_home_par}/master.english-speed.par OVERRIDE use_feature_module_KbaStreamCorpus: true OVERRIDE use_stream_corpus_driver: true OVERRIDE start_stage: output OVERRIDE end_stage: output OVERRIDE kba_read_serifxml_from_chunk: true OVERRIDE kba_write_results_to_chunk: true OVERRIDE parser_skip_unimportant_sentences: false OVERRIDE source_format: serifxml ''' streamcorpus_generate_serifxml = ''' ### WARNING: this was written by the streamcorpus_pipeline, and will be ### overwritten the next time the streamcorpus_pipeline runs. # Usage: Serif streamcorpus_generate_serifxml.par CHUNK_FILES -o OUT_DIR # # This parameter file is identical to streamcorpus_one_step.par, except # that: (1) it saves the serifxml output to the stream items; and (2) # it does *not* save the sentence & relation annotations. I used this # parameter file to generate test inputs for the # streamcorpus_read_serifxml.par parameter file. # (streamcorpus_one_step.par is defined above) INCLUDE ./streamcorpus_one_step.par OVERRIDE kba_write_results_to_chunk: false OVERRIDE kba_write_serifxml_to_chunk: true ''' def __init__(self, *args, **kwargs): super(serif, self).__init__(*args, **kwargs) self.tagger_root_path = os.path.join(self.config['third_dir_path'], self.config['path_in_third']) self._child = None def _write_config_par(self, tmp_dir, par_file): if par_file == 'streamcorpus_generate_serifxml': # streamcorpus_generate_serifxml INCLUDEs streamcorpus_one_step self._write_config_par(tmp_dir, 'streamcorpus_one_step', self.tagger_root_path) par_data = getattr(self, par_file) for line in self.config.get('par_additions', {}).get(par_file, []): par_data += '\n' par_data += line + '\n' fpath = os.path.join(tmp_dir, par_file + '.par') fout = open(fpath, 'wb') fout.write( par_data.format( serif_home_par=os.path.join(self.tagger_root_path, 'par') ) ) fout.close() logger.debug('wrote %s (%s bytes) to %r', par_file, len(par_data), fpath) return fpath def process_path(self, chunk_path): tmp_dir = os.path.join(self.config['tmp_dir_path'], str(uuid.uuid4())) os.mkdir(tmp_dir) par_file = self.config['par'] par_path = self._write_config_par(tmp_dir, par_file) tmp_chunk_path = os.path.join(tmp_dir, 'output', os.path.basename(chunk_path)) cmd = [ os.path.join(self.tagger_root_path, self.config['serif_exe']), par_path, '-o', tmp_dir, chunk_path, ] logger.info('serif cmd: %r', cmd) start_time = time.time() # make sure we are using as little memory as possible gc.collect() try: self._child = subprocess.Popen(cmd, stderr=subprocess.PIPE, shell=False) except OSError, exc: logger.error('error running serif cmd %r', cmd, exc_info=True) msg = traceback.format_exc(exc) msg += make_memory_info_msg() logger.critical(msg) raise s_out, errors = self._child.communicate() if not self._child.returncode == 0: if self._child.returncode == 137: msg = 'tagger returncode = 137\n' + errors msg += make_memory_info_msg() # maybe get a tail of /var/log/messages raise PipelineOutOfMemory(msg) elif 'Exception' in errors: logger.error('child code %s errorlen=%s', self._child.returncode, len(errors)) raise PipelineBaseException(errors) else: raise PipelineBaseException('tagger exited with %r' % self._child.returncode) # generated new tokens, so align labels with them align_labels(tmp_chunk_path, self.config) if self.config.get('cleanup_tmp_files', True): # default: cleanup tmp directory os.rename(tmp_chunk_path, chunk_path) shutil.rmtree(tmp_dir) else: # for development, no cleanup, leave tmp_file chunk_path_save = ('{0}_pre_serif_{1}' .format(chunk_path, os.path.getmtime(chunk_path))) os.rename(chunk_path, chunk_path_save) shutil.copy(tmp_chunk_path, chunk_path) elapsed = time.time() - start_time logger.info('finished tagging in %.1f seconds' % elapsed) return elapsed def shutdown(self): ''' send SIGTERM to the tagger child process ''' if self._child: try: self._child.terminate() except OSError, exc: if exc.errno == 3: # child is already gone, possibly because it ran # out of memory and caused us to shutdown pass else: raise