3. streamcorpus_pipeline — stream item processing framework

Stream item processing framework.

The streamcorpus pipeline is a basic framework for processing streamcorpus.StreamItem objects. Stream items are passed through the pipeline, with different stages incrementally adding parts to it. The general flow of the pipeline is as follows:

  1. A reader stage produces an initial set of stream items, one per input document. Many readers, such as from_local_chunks and from_s3_chunks, expect to take in streamcorpus.Chunk files with fully populated stream items. yaml_files_list takes a listing of plain external files from a YAML file. Other processing may require writing a custom reader stage. In the most basic case, this produces a stream item with only the body.raw field set.
  2. A set of transforms annotates and fills in parts of the stream item. If the inbound documents contain only raw content, it is common to run language to detect the language, and to run dedicated stages to produce clean_html and clean_visible forms of the body. Annotators such as hyperlink_labels will add labels at known offsets within the entire document.
  3. A tokenizer reads the actual text and parses it into parts of speech, creating token objects in the stream item. A tokenizer is not included in this package, but there are bundled stages to run nltk_tokenizer, lingpipe, and serif.
  4. An aligner reads labels at known offsets in the document and the output of the tokenizer and attempts to move the labels to be attached to their corresponding tokens. The aligner is generally run as part of the tokenizer rather than as a separate stage.
  5. Additional transforms could be run at this point on the tokenized output, though this is rare.
  6. A writer places the fully processed stream items somewhere. to_local_chunks is useful for local testing and development; networked storage via to_kvlayer or to_s3_chunks is helpful for publishing the chunk files to other developers and tools.

3.1. streamcorpus_pipeline

streamcorpus_pipeline is the command-line entry point to the pipeline.

The supplied configuration must have a streamcorpus_pipeline section. Other required configuration depends on the specific stages in use and the rest of the operating environment; for instance, if the to_kvlayer writer stage is in use, then the configuration must also contain a top-level kvlayer section.

This supports the standard --config, --dump-config, --verbose, --quiet, and --debug options. It supports the following additional options:

--input <file.sc>, -i <file.sc>

Names the input file. If the file name is - then standard input is used, if this makes sense. Some reader stages may use the file name specially.

--in-glob <pattern>

Runs the pipeline once for each file matching the shell glob pattern.

3.2. streamcorpus_directory

Process entire directories using streamcorpus_pipeline.

streamcorpus_directory reads an entire directory through the streamcorpus pipeline. This allows work to be distributed using the rejester distributed work system, with an implied assumption that all systems running a rejester worker for the given namespace have access to the local file storage space, either because the only worker is on the local system or because the named directory is on a shared networked filesystem.

The command is followed by a list of directory names, or other inputs. By default each named directory is scanned, and each file in each directory is submitted as an input to the pipeline. A rejester job is created for each file.

Note that rejester is typically used to distribute jobs acress a cluster of systems. This program only submits the file names as jobs; it does not submit actual file contents. If the pipeline is configured with a reader such as from_s3_chunks, this tool can be used to submit a sequence of S3 source paths, either on the command line directly using the --files option, or by collecting the paths into a text file and using --file-lists. If the pipeline is configured with a reader such as from_local_chunks, the caller must ensure that every system running a worker process has a copy of the source files.

This supports the standard --config, --dump-config, --verbose, --quiet, and --debug options. It supports the following additional options:


The positional arguments are interpreted as directory names. Each named directory is scanned recursively, and every file in or under that directory is used as an input to the pipeline. This is the default mode.


The positional arguments are interpreted as file names. Each named file is used as an input to the pipeline.


The positional arguments are file names. Each named file is a text file containing a list of input filenames or other sources, one to a line. These are used as inputs to the pipeline.


Process all of the files in the directory, sequentially, then exit. This may be more convenient for debugging the pipeline configuration. Does not require rejester configuration.


Create rejester work units, then exit. The configuration must contain complete rejester configuration. This is the default mode.

--work-spec <name>

Use name as the name of the rejester work spec. Defaults to streamcorpus_directory.

--nice <n>

Deprioritize the submitted jobs in rejester. n can be a relative score between 0 and 20.

3.3. Pipeline configuration and execution

Configuration and execution of the actual pipeline.

The Pipeline itself consists of a series of stages. These are broken into several categories:

  • Exactly one reader runs first, producing a sequence of streamcorpus.StreamItem objects.
  • The stream items are fed through incremental transforms, which take one stream item in and produce one stream item out.
  • All of the stream items are written to a file, and batch transforms can operate on the entire collection of stream items at once.
  • Post-batch incremental transforms again operate on individual stream items.
  • Some number of writers send the final output somewhere.

3.3.1. Configuration

The streamcorpus_pipeline tool expects configuration in a YAML file. The configuration resulting from this can be passed through PipelineFactory to create Pipeline objects. A typical coniguration looks like:

  # Lists of stages
  reader: from_local_chunks
  incremental_transforms: [clean_html, language]
  batch_transforms: []
  post_batch_incremental_transforms: []
  # to_local_chunks must be the last writer if it is used
  writers: [to_local_chunks]

  # Configuration for specific stages
    include_language_codes: [en]

The streamcorpus_pipeline block can additionally be configured with:

root_path: /home/user/diffeo

Any configuration variable whose name ends in “path” whose value is not an absolute path is considered relative to this directory. If omitted, use the current directory.

tmp_dir_path: directory

Intermediate files are stored in a subdirectory of directory. The pipeline execution will make an effort to clean this up. If omitted, defaults to /tmp.

third_dir_path: directory

External packages such as NLP taggers are stored in subdirectories of directory; these are typically individually configured with path_in_third options relative to this directory.

rate_log_interval: 500

When this many items have been processed, log an INFO-level log message giving the current progress.

input_item_limit: 500

Stop processing after reading this many stream items. (Default: process the entire stream)

cleanup_tmp_files: true

After execution finishes, delete the per-execution subdirectory of tmp_dir_path.

assert_single_source: false

Normally a set of stream items has a consistent streamcorpus.StreamItem.source value, and the pipeline framework will stop if it sees different values here. Set this to false to disable this check. (Default: do assert a single source value)

output_chunk_max_count: 500

After this many items have been written, close and re-open the output. (Default: 500 items)

output_max_clean_visible_bytes: 1000000

After this much streamcorpus.StreamItem.clean_visible content has been written, close and re-open the output. (Default: write entire output in one batch)

external_stages_path: stages.py

The file stages.py is a Python module that declares a top-level dictionary named Stages, a map from stage name to implementing class. Stages defined in this file can be used in any of the appropriate stage lists.

external_stages_modules: [ example.stages ]

The Python module example.stages declares a top-level dictionary named Stages, a map from stage name to implementing class. The named modules must be on sys.path so that the Python interpreter can find it. Stages defined in these modules can be used in any of the appropriate stage lists.

3.3.2. API

The standard execution path is to pass the streamcorpus_pipeline module to yakonfig.parse_args(), then use PipelineFactory to create Pipeline objects from the resulting configuration.


Make the top-level configuration point PipelineFactory, not the streamcorpus_pipeline module

class streamcorpus_pipeline._pipeline.PipelineFactory(registry)[source]

Factory to create Pipeline objects from configuration.

Call this to get a Pipeline object. Typical programmatic use:

parser = argparse.ArgumentParser()
args = yakonfig.parse_args([yakonfig, streamcorpus_pipeline])
factory = PipelineFactory(StageRegistry())
pipeline = factory(yakonfig.get_global_config('streamcorpus_pipeline'))

This factory class will instantiate all of the stages named in the streamcorpus_pipeline configuration. These stages will be created with their corresponding configuration, except that they have two keys added, tmp_dir_path and third_dir_path, from the top-level configuration.


Create a pipeline factory.

  • config (dict) – top-level “streamcorpus_pipeline” configuration
  • registry (StageRegistry) – registry of stages

Create a Pipeline.

Pass in the configuration under the streamcorpus_pipeline block, not the top-level configuration that contains it.

If tmp_dir_suffix is None, then locks the factory and creates stages with a temporary (UUID) value. If the configuration has cleanup_tmp_files set to True (the default) then executing the resulting pipeline will clean up the directory afterwards.

Parameters:config (dict) – streamcorpus_pipeline configuration
Returns:new pipeline instance

The streamcorpus_pipeline.stages.StageRegistry used to find pipeline stages.


A string value that is appended to tmp_dir_path when creating pipeline stages. If None, use the top-level tmp_dir_path configuration directly.


A threading.Lock to protect against concurrent modification of tmp_dir_suffix.

create(stage, scp_config, config=None)[source]

Create a pipeline stage.

Instantiates stage with config. This essentially translates to stage(config), except that two keys from scp_config are injected into the configuration: tmp_dir_path is an execution-specific directory from combining the top-level tmp_dir_path configuration with tmp_dir_suffix; and third_dir_path is the same path from the top-level configuration. stage may be either a callable returning the stage (e.g. its class), or its name in the configuration.

scp_config is the configuration for the pipeline as a whole, and is required. config is the configuration for the stage; if it is None then it is extracted from scp_config.

If you already have a fully formed configuration block and want to create a stage, you can call


In most cases if you have a stage class object and want to instantiate it with its defaults you can call

stage = stage_cls(stage_cls.default_config)


This mirrors yakonfig.factory.AutoFactory.create(), with some thought that this factory class might migrate to using that as a base in the future.

  • stage – pipeline stage class, or its name in the registry
  • scp_config (dict) – configuration block for the pipeline
  • config (dict) – configuration block for the stage, or None to get it from scp_config
class streamcorpus_pipeline._pipeline.Pipeline(rate_log_interval, input_item_limit, cleanup_tmp_files, tmp_dir_path, assert_single_source, output_chunk_max_count, output_max_clean_visible_bytes, reader, incremental_transforms, batch_transforms, post_batch_incremental_transforms, writers)[source]

Pipeline for extracting data into StreamItem instances.

The pipeline has five sets of stages. The reader stage reads from some input source and produces a series of StreamItem objects out. Incremental transforms take single StreamItem objects in and produce single StreamItem objects out. Batch transforms run on the entire set of StreamItem objects together. There is a further set of post-batch incremental transforms which again run on individual StreamItem objects. Finally, any number of writers send output somewhere, usually a streamcorpus.Chunk file.

__init__(rate_log_interval, input_item_limit, cleanup_tmp_files, tmp_dir_path, assert_single_source, output_chunk_max_count, output_max_clean_visible_bytes, reader, incremental_transforms, batch_transforms, post_batch_incremental_transforms, writers)[source]

Create a new pipeline object.


make this callable with just the lists of stages and give sane (language-level) defaults for the rest

  • rate_log_interval (int) – print progress every time this many input items have been processed
  • input_item_limit (int) – stop after this many items
  • cleanup_tmp_files (bool) – delete tmp_dir_path after execution if true
  • tmp_dir_path (str) – path for intermediate files
  • assert_single_source (bool) – require all items to have the same source value if true
  • output_chunk_max_count (int) – restart output after writing this many items
  • output_max_clean_visible_bytes (int) – restart output after writing this much content
  • reader (callable) – reader stage object
  • incremental_transforms (list of callable) – single-item transformation stages
  • batch_transforms (list of callable) – chunk-file transformation stages
  • post_batch_incremental_transforms (list of callable) – single-item transformation stages
  • writers (list of callable) – output stages
run(i_str, start_count=0, start_chunk_time=None)[source]

Run the pipeline.

This runs all of the steps described in the pipeline constructor, reading from some input and writing to some output.

  • i_str (str) – name of the input file, or other reader-specific description of where to get input
  • start_count (int) – index of the first stream item
  • start_chunk_time (int) – timestamp for the first stream item

Process a rejester.WorkUnit.

The work unit’s key is taken as the input file name. The data should have start_count and start_chunk_time values, which are passed on to run().

Parameters:work_unit (rejester.WorkUnit) – work unit to process
Returns:number of stream items processed
run(i_str, start_count=0, start_chunk_time=None)[source]

Run the pipeline.

This runs all of the steps described in the pipeline constructor, reading from some input and writing to some output.

  • i_str (str) – name of the input file, or other reader-specific description of where to get input
  • start_count (int) – index of the first stream item
  • start_chunk_time (int) – timestamp for the first stream item

3.4. Pipeline stages

Registry of stages for streamcorpus_pipeline.

Stages are implemented as callable Python objects. In almost all cases the name of the stage in the configuration file is the same as its class name.

3.4.1. Readers

class streamcorpus_pipeline._convert_kba_json.convert_kba_json(config, *args, **kwargs)[source]

Returns a streamcorpus_pipeline “reader” that generates a single streamcorpus.Chunk file containing the John Smith corpus.

class streamcorpus_pipeline._local_storage.from_local_chunks(config, *args, **kwargs)[source]

may use config[‘max_retries’] (default 1) may use config[‘max_backoff’] (seconds, default 300) may use config[‘streamcorpus_version’] (default ‘v0_3_0’)

class streamcorpus_pipeline._local_storage.from_local_files(config)[source]

Read plain text or other raw content from local files.

Each input filename is taken as an actual filename. This produces one stream item per file. This can be configured as follows:

abs_url: null
url_prefix: file://
absolute_filename: true
epoch_ticks: file
encoding: null

If abs_url is provided in configuration, this value is used as-is as the corresponding field in the stream item. Otherwise, the path is converted to an absolute path if absolute_filename is true, and then url_prefix is prepended to it.

epoch_ticks indicates the corresponding time stamp in the stream item. This may be an integer for specific seconds since the Unix epoch, the string file indicating to use the last-modified time of the file, or the string now for the current time.

encoding should be set if you know the encoding of the files you’re loading. e.g., utf-8.

The values shown above are the defaults. If reading in files in a directory structure matching a URL hierarchy, an alternate configuration could be

url_prefix: "http://home.example.com/"
absolute_path: false
class streamcorpus_pipeline._kvlayer.from_kvlayer(*args, **kwargs)[source]

Read documents from a kvlayer supported database.

kvlayer must be included in the top-level configuration. Typically this reads stream items previously written by to_kvlayer.

The input string is a sequence of stream IDs; see __call__() for details. If no values are specified, scans the entire STREAM_ITEMS_TABLE table.


Actually scan the table, yielding stream items.

i_str consists of a sequence of 20-byte encoded stream IDs. These are the 16 bytes of the document ID, plus the 4-byte big-endian stream time. serialize_si() can produce these. i_str can then consist of a single encoded stream ID; an encoded stream ID, a literal <, and a second encoded stream ID to specify a range of documents; or a list of either of the preceding separated by literal ;.


make this support keyword index strings

class streamcorpus_pipeline._s3_storage.from_s3_chunks(config)[source]

Reads data from Amazon S3 one key at a time. The type of data read can be specified with the input_format config option. The following values are legal: streamitem, featurecollection or spinn3r.

When the input format is streamitem or spinn3r, then this reader produces a generator of streamcorpus.StreamItem instances.

When the input format is featurecollection, then this reader produces a generator of dossier.fc.FeatureCollection instances.

bucket is the s3 bucket to use if input paths are not full s3://{bucket}{path} URIs. aws_access_key_id_path and aws_secret_access_key_path should point to files containing your s3 credentials. Alternatley credentials can be in environment variables `AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

The rest of the configuration options are optional and are described in the following example:

  # How to connect to S3
  # bucket can also come from input path if full uri s3://{bucket}{path}
  bucket: aws-publicdatasets
  # can also come from environment variable AWS_ACCESS_KEY_ID
  aws_access_key_id_path: keys/aws_access_key_id
  # can also come from environment variable AWS_SECRET_ACCESS_KEY
  aws_secret_access_key_path: keys/aws_secret_access_key

  # Optional parameters.
  # The number of times to try reading from s3. A value of
  # `1` means the download is tried exactly once.
  # The default value is `10`.
  tries: 1

  # This is prepended to every key given. Namely, all s3 URLs
  # are formed by `s3://{bucket}/{s3_path_prefix}/{input-key-name}`.
  # By default, it is empty.
  s3_path_prefix: some/s3/prefix

  # A path to a private GPG decryption key file.
  gpg_decryption_key_path: keys/gpg-key

  # The type of data to read from s3. Valid values are
  # "StreamItem", "spinn3r" or "FeatureCollection".
  # The default is "StreamItem".
  input_format: StreamItem

  # When the input format is "StreamItem", this indicates the
  # Thrift version to use. Defaults to "v0_3_0".
  streamcorpus_version: v0_3_0

  # When set, an md5 is extracted from the s3 key and is used
  # to verify the decrypted and decompressed content downloaded.
  # This is disabled by default.
  compare_md5_in_file_name: true

  # A temporary directory where intermediate files may reside.
  # Uses your system's default tmp directory (usually `/tmp`)
  # by default.
  tmp_dir_path: /tmp
class streamcorpus_pipeline._john_smith.john_smith(config, *args, **kwargs)[source]
class streamcorpus_pipeline._yaml_files_list.yaml_files_list(config, *args, **kwargs)[source]

Read and tag flat files as described in a YAML file.

When this reader stage is in use, the input file to the pipeline is a YAML file. That YAML file points at several other files and directories, and the contents of those files are emitted as stream items.

The YAML file has the following format:

root_path: /data/directory
annotator_id: my-name
source: weblog
  - target_id: "https://kb.diffeo.com/sample"
      - sample
      - doc_path: copies/sample
        abs_url: "http://source.example.com/sample"
      - doc_path: wikipedia/Sample
        abs_url: "http://en.wikipedia.org/wiki/Sample"
      - canonical_name: Sample
      - example
      - title: [Sample, "dot com"]
      - knowledge_bases:
        - value: wikipedia
        - profile_id: "http://en.wikipedia.org/wiki/Sample"
        - annotator_id: my-name

At the top level, the file contains:

root_path (default: current directory)
Root path to the data to read.
annotator_id (required)
Annotator ID string on produced document ratings.
source (default: unknown)
Source type for produced stream items.
entities (required)
List of entity document sets to read.

entities is a list of dictionaries. Each entity dictionary has the following items:

target_id (required)
Target entity URL for produced ratings
doc_path (required)
List of source files or directories. These may be strings or dictionaries. If dictionaries, doc_path is the filesystem path and abs_url is the corresponding URL; if strings, the strings are filesystem paths and the URL is generated. Filesystem paths are interpreted relative to the root_path.
external_profiles (optional)
List of additional source files or directories that are external profiles, such as Wikipedia articles. These are processed in the same way as items in doc_path but must be in dictionary form, where the abs_url field identifies the external knowledge base article.
slots (required)
Additional knowledge base metadata about the entity.

For each entity, the reader finds all documents in the named directories and creates a streamcorpus.StreamItem for each. The body has its raw data set to the contents of the file. The stream items are also tagged with a streamcorpus.Rating where the annotator comes from the file metadata and the target comes from the entity target_id. The mentions on the rating are set to the union of all of the slot values for all slots.

The slots field is a list of values. Each value is interpreted as follows:

  1. If it is a dictionary, then each key in the dictionary is interpreted as a slot name, and the corresponding value provides the slot values. Otherwise, the value is taken as a value for the slot NAME.
  2. If the slot value is a list, then each item in the list is independently interpreted as a value for this slot.
  3. If the slot value is a dictionary, then it may have keys value (required), mention_id, profile_id, and annotator_id, which are used to construct a composite slot value. Otherwise, the slot value is used directly without additional metadata.

The following YAML fragment provides several values for the NAME slot, which are ultimately combined:

  - one
  - NAME: two
  - NAME: [three, four]
  - NAME:
      value: five
class streamcorpus_pipeline._spinn3r_feed_storage.from_spinn3r_feed(config={}, prefetched=None)[source]

streamcorpus-pipeline reader for spinn3r.com feeds.

Accepted configuration items include:

If set to a boolean value, always/never use prefetched data (default: unset; use prefetched data if present)
If set, only return stream items whose publisher type in the spinn3r feed exactly matches this string

A dictionary from URL to prefetched data can be passed as a parameter to the stage constructor. If use_prefetched is True then all input strings must be present in the prefetch dictionary, and this stage never makes an outgoing network connection. If use_prefetched is False then the prefetch dictionary is ignored. Otherwise if an input string is present in the prefetch dictionary, then the prefetched data is used, and if not, it is fetched from the network.

class streamcorpus_pipeline._serifxml.from_serifxml(config, *args, **kwargs)[source]

Read a Serif XML intermediate file as the input to the pipeline.

This is a specialized reader for unusual circumstances; you will still need to run serif with special settings to complete the tagging. This expects to find serifxml flat files in a directory and creates a Tagging with raw_tagging holding the serifxml string. This Tagging is stored in taggings.

This also fills in raw field.

This has one configuration option, which can usually be left at its default value:

    tagger_id: serif

tagger_id is the tagger name in the generated StreamItem.

To obtain sentences, one must run Serif in the special read_serifxml mode:

  third_dir_path: /third
  tmp_dir_path: tmp
  reader: from_serifxml
  - language
  - guess_media_type
  - clean_html
  - clean_visible
  - title
  - serif
      name: English
      code: en
    fallback_media_type: text/plain
    path_in_third: serif/serif-latest
    serif_exe: bin/x86_64/Serif
    par: streamcorpus_read_serifxml
      - "# example additional line"
  writer: to_local_chunks
    output_type: otherdir
    output_path: test_output
    output_name: "%(input_fname)s"

3.4.2. Incremental transforms

class streamcorpus_pipeline._clean_html.clean_html(*args, **kwargs)[source]

Create body.clean_html from body.raw.

Configuration options:

require_language_code: en

If set, only work on stream items whose language code is the specified value; pass others on unchanged. Defaults to unset.

include_language_codes: [en, ""]

If set to a non-empty list, only work on stream items whose language code is one of the specified values; pass others on unchanged. Defaults to empty list (process all languages).

include_mime_types: ['text/html']

If set to a non-empty list, only work on stream items whose mime type matches one of the specified values. If set to an empty list, then all stream items (assuming they match the above language criteria) will have clean_html populated from body.raw. Default to list of just text/html.

class streamcorpus_pipeline.force_clean_html.force_clean_html(config, *args, **kwargs)[source]

force clean_html to be populated or rejects the StreamItem.

class streamcorpus_pipeline._clean_visible.clean_visible(config, *args, **kwargs)[source]

Create body.clean_visible from body.clean_html.

clean_visible maintains byte position of all visible text in an HTML (or XML) document and removes all of parts that are not visible in a web browser. This allows taggers to operate on the visible-only text and any standoff annotation can refer to the original byte positions.

If there is no clean_html, but there is a raw property with a text/plain media type, use that value directly.

This has no useful configuration options. The configuration metadata will include a setting:

require_clean_html: true

Setting this to false will always fail.

class streamcorpus_pipeline.offsets.xpath_offsets(config)[source]
class streamcorpus_pipeline._pdf_to_text.pdf_to_text(config, *args, **kwargs)[source]

returns a kba.pipeline “transform” function that attempts to generate stream_item.body.clean_visible from body.raw

class streamcorpus_pipeline._docx_to_text.docx_to_text(config, *args, **kwargs)[source]

returns a kba.pipeline “transform” function that attempts to generate stream_item.body.clean_visible from body.raw

class streamcorpus_pipeline._title.title(*args, **kwargs)[source]

Create “title” entry in other_content, if it does not already exist.

class streamcorpus_pipeline._filters.debug_filter(config, *args, **kwargs)[source]

Remove all stream items except specified ones.

This is only needed to debug troublesome configurations if there are problems with specific stream items. It has one configuration item:

  - 1360783522-ebf059defa41b2812c1f969f28cdb45e

Only stream IDs in the list are passed on, all others are dropped. Defaults to an empty list (drop all stream items).

class streamcorpus_pipeline._filters.filter_domains(config, *args, **kwargs)[source]

Remove stream items that are not from a specific domain by inspecting first abs_url and then schost. domain name strings are cleansed.

  include_domains: [example.com]
  - path-to-file-with-one-domain-per-line.txt
  - path-to-file-with-one-domain-per-line2.txt
class streamcorpus_pipeline._filters.filter_domains_substrings(config, *args, **kwargs)[source]

Remove stream items that are not from a domain whose name contains one of the specified strings by inspecting first abs_url and then schost. Substrings are used exactly as provided without cleansing.

  include_substrings: [example]
  - path-to-file-with-one-substring-per-line.txt
  - path-to-file-with-one-substring-per-line2.txt
class streamcorpus_pipeline._fix_text.fix_text(config, *args, **kwargs)[source]

Tries to fix poorly encoded Unicode text on a stream item. It expects the raw Unicode encoded text in the read_from field (default is raw) and writes the fixed text as UTF8 encoding to the write_to field (default is clean_visible).

class streamcorpus_pipeline._dedup.dedup(*args, **kwargs)[source]
class streamcorpus_pipeline._dump_label_stats.dump_label_stats(config)[source]
class streamcorpus_pipeline._filters.id_filter(*args, **kwargs)[source]

Include or exclude specific stream items by doc_id or stream_id

If specific stream items have been identified, e.g. as causing problems with a tagger or other stages, then this can remove them before they run.

  excluded_stream_ids_path: file-of-one-stream-id-per-line.txt.xz
    - 1360783522-ebf059defa41b2812c1f969f28cdb45e
  included_stream_ids_path: file-of-one-stream-id-per-line.txt.xz
    - 1360783522-ebf059defa41b2812c1f969f28cdb45e
  included_doc_ids_path: file-of-one-doc-id-per-line.txt.xz
    - ebf059defa41b2812c1f969f28cdb45e
  excluded_doc_ids_path: file-of-one-doc-id-per-line.txt.xz
    - ebf059defa41b2812c1f969f28cdb45e

Stream IDs in the excluded_stream_ids list are dropped, all others are passed on unmodified. Defaults to an empty list (don’t drop any stream items).

In addition to listing Stream IDs in the yaml itself, you can pass excluded_stream_ids_path with one Stream ID per line. This file can be optionally be xz-compressed.


returns a kba.pipeline “transform” function that generates file type stats from the stream_items that it sees. Currently, these stats are just the first five non-whitespace characters.

class streamcorpus_pipeline._filters.filter_languages(config, *args, **kwargs)[source]

Remove stream items that aren’t in specific languages.

This looks at the body language field. It has two configuration options:

  allow_null_language: True
  included_language_codes: [en]

If the language field is empty, the stream item is dropped unless allow_null_language is true (default value). Otherwise, the stream item is dropped unless its language code is one of the included_language_codes values (defaults to empty list).

class streamcorpus_pipeline._filters.filter_tagger_ids(config, *args, **kwargs)[source]

Remove stream items that lack a StreamItem.body.sentences entry from specified tagger_id(s)

This looks at the sentences field. It has two configuration options:

    - serif

If the none of the tagger_ids_to_keep are in sentences, the stream item is dropped. If one of the required tagger_ids is present, but points to an empty list, then it is also dropped.

class streamcorpus_pipeline._find.find(config)[source]
class streamcorpus_pipeline._find.find_doc_ids(config)[source]
class streamcorpus_pipeline._guess_media_type.guess_media_type(config, *args, **kwargs)[source]

returns a kba.pipeline “transform” function that populates body.media_type if it is empty and the content type is easily guessed.

class streamcorpus_pipeline._handle_unconvertible_spinn3r.handle_unconvertible_spinn3r(config, *args, **kwargs)[source]

It seems that some of the spinn3r content is not actually UTF-8 and there is no record of the original encoding, so we take a shot at converting the spinn3r-provided “content_extract” into utf8 and using it as the clean_html. If that fails, we drop the entire document from the corpus.

Creates document labels from hyperlinks in body.clean_html.

The labels are attached to the stream item body with an annotator ID of author. Any HTML <a href="..."> matching the selection criteria will be turned into a label.

You generally must set either all_domains or domain_substrings. A typical configuration will look like:

  incremental_transforms: [ ..., hyperlink_labels, ... ]
    require_abs_url: true
    domain_substrings: [ "trec-kba.org" ]

Configuration options:

all_domains: false
domain_substrings: [ 'trec-kba.org' ]

A label will only be produced if all_domains is true, or if the hostname part of the URL has one of domain_substrings as a substring. Note that the default configuration is all_domains false and an empty domain_substrings list, so no labels will be produced without additional configuration.

require_abs_url: true

Only produce labels for fully-qualified http://... URLs. False by default.

offset_types: [BYTES]

A list containing at least one of BYTES and LINES indicating what sort of document offset should be attached to the label. Only the first value is used. LINES is not recommended.

require_clean_html: true

Cannot be changed.

class streamcorpus_pipeline._upgrade_streamcorpus.keep_annotated(config)[source]

returns a kba.pipeline “transform” function that populates takes in v0_1_0 StreamItems and emits v0_2_0 StreamItems

class streamcorpus_pipeline._language.language(config, *args, **kwargs)[source]

Guess at a language from body.

This always adds a language annotation to the body, but if the body does not have a raw part or the language cannot be reliably detected, it may be empty.

This attempts to detect on body.clean_html if present, and then body.clean_visible if present, and then falls back to attempting to use body.encoding to decode the body.raw and tries to detect language from the output of that, and finally, it attempts to detect language on undecoded body.raw.

This has no configuration options.

class streamcorpus_pipeline._nilsimsa.nilsimsa(config, *args, **kwargs)[source]

Creates other_content with key nilsimsa and a raw property set to Nilsimsa(cleanse(si.body.clean_visible)).hexdigest()

This has no configuration options.

class streamcorpus_pipeline._filters.remove_raw(config, *args, **kwargs)[source]

Remove the raw form of the content.

While the body raw form is important as inputs to create the “clean” forms, it may be bulky and not useful later. This strips the raw form, replacing it with an empty string. It has one configuration option:

  if_clean_visible_remove_raw: True

If if_clean_visible_remove_raw is set (defaults to false), the raw form is only removed if the clean_visible form is available; otherwise it is always removed.

class streamcorpus_pipeline._filters.replace_raw(config, *args, **kwargs)[source]

Replace the raw form of the content with clean_html, or delete the StreamItem if it lacks clean_html.

While the body raw form is important as inputs to create the “clean” forms, it may be bulky and not useful later. This strips the raw form, replacing it with clean_html.

class streamcorpus_pipeline._filters.dump_stream_id_abs_url(config, *args, **kwargs)[source]

print stream_id, abs_url strings to stdout and do not pass any StreamItems down the pipeline

class streamcorpus_pipeline._upgrade_streamcorpus.upgrade_streamcorpus(config, *args, **kwargs)[source]

returns a kba.pipeline “transform” function that populates takes in v0_1_0 StreamItems and emits v0_2_0 StreamItems

class streamcorpus_pipeline._upgrade_streamcorpus_v0_3_0.upgrade_streamcorpus_v0_3_0(config, *args, **kwargs)[source]
class streamcorpus_pipeline._tokenizer.nltk_tokenizer(config)[source]

Minimal tokenizer using nltk.

This is an incremental transform that creates minimal tokens and does minimal label alignment. The stream item must have a clean_visible part already. The stream item is tagged with nltk_tokenizer.

This tokenizer has very few dependencies, only the nltk Python package. However, the tokens that it creates are extremely minimal, with no part-of-speech details or other information. The tokens only contain the token property and character-offset information.

If there are labels with character offsets attached to the stream item body, and this stage is configured with an annotator_id, then this stage attempts to attach those labels to tokens.

3.4.3. Batch transforms

class streamcorpus_pipeline._taggers.byte_offset_align_labels(config, *args, **kwargs)[source]

requires config[‘annotator_id’] (which person/org did manual labelling) requires config[‘tagger_id’] (which software did tagging to process)

class streamcorpus_pipeline._taggers.line_offset_align_labels(config, *args, **kwargs)[source]

requires config[‘annotator_id’] (which person/org did manual labelling) requires config[‘tagger_id’] (which software did tagging to process)

class streamcorpus_pipeline._taggers.name_align_labels(config, *args, **kwargs)[source]

requires config[‘chain_selector’] = ALL | ANY requires config[‘annotator_id’] (which person/org did manual labelling)

class streamcorpus_pipeline._lingpipe.lingpipe(*args, **kwargs)[source]

a streamcorpus_pipeline batch transform that converts a chunk into a new chunk with data generated by LingPipe

class streamcorpus_pipeline._serif.serif(*args, **kwargs)[source]

Batch transform to parse and tag documents with Serif.

Serif must be installed externally, somewhere underneath the directory configured as third_dir_path in the top-level pipeline configuration.

A typical Serif pipeline configuration looks like:

  tmp_dir_path: tmp
  third_dir_path: /opt/diffeo/third
  reader: ...
    - language
    - guess_media_type
    - clean_html
    - clean_visible
  batch_transforms: [serif]
  writers: [...]

    path_in_third: serif/serif-latest
    par: streamcorpus_one_step
    align_labels_by: names_in_chains
      chain_selector: ALL
      annotator_id: annotator

Configuration options include:

path_in_third (required)
Relative path in third_dir_path to directory containing serif data directories
serif_exe (default: bin/x86_64/Serif)
Relative path within path_in_third to the Serif executable file
par (default: streamcorpus_one_step)
Serif policy configuration; this is typically streamcorpus_one_step, but may also be streamcorpus_read_serifxml or streamcorpus_generate_serifxml
Map from poicy configuration name to list of strings to append as additional lines to customize the policy.
cleanup_tmp_files (default: true)
Delete the intermediate files used by Serif

The two “align” options control how ratings on the document are associated with tokens generated by Serif.

3.4.4. Writers

class streamcorpus_pipeline._local_storage.to_local_chunks(config, *args, **kwargs)[source]

Write output to a local chunk file.


This stage must be listed last in the writers list. For efficiency it renames the intermediate chunk file, and so any subsequent writer stages will have no input chunk file to work with.

This stage may take several additional configuration parameters.

output_type: samedir

Where to place the output: inplace replaces the input file; samedir uses the output_name in the same directory as the input; and otherdir uses output_name in output_path.

output_path: /home/diffeo/outputs

If output_type is otherdir, the name of the directory to write to.

output_name: output-%(first)d.sc

Gives the name of the output file if output_type is not inplace. If this is input then the output file name is automatically derived from the input file name. This may also include Python format string parameters:

  • %(input_fname)s: the basename of the input file name
  • %(input_md5)s: the last part of a hyphenated input filename
  • %(md5)s: MD5 hash of the chunk file
  • %(first)d: index of the first item in the chunk file
  • %(num)d: number of items in the chunk file
  • %(epoch_ticks)d: timestamp of the first item in the chunk file
  • %(target_names)s: hyphenated list of rating target names
  • %(source)s: source name for items in the chunk file
  • %(doc_ids_8)s: hyphenated list of 8-character suffixes of document IDs
  • %(date_hour)s: partial timestamp including date and hour
  • %(rand8)s: 8-byte random hex string
compress: true

If specified, append .xz to the output file name and LZMA-compress the output file. Defaults to false.

cleanup_tmp_files: false

If set to “false”, do not delete the intermediate file (or rename it away), enabling later writer stages to be run. Defaults to true.

class streamcorpus_pipeline._local_storage.to_local_tarballs(config, *args, **kwargs)[source]
class streamcorpus_pipeline._kvlayer.to_kvlayer(*args, **kwargs)[source]

Writer that puts stream items in kvlayer.

kvlayer must be included in the top-level configuration. Compressed stream items are written to the STREAM_ITEMS_TABLE table.

This writer has one configuration parameter: indexes adds additional indexes on the data. Supported index types include WITH_SOURCE, BY_TIME, HASH_TF_SID, HASH_FREQUENCIES, and HASH_KEYWORD. The special index type KEYWORDS expands to all of the keyword-index tables.

If keyword indexing is enabled, it indexes the tokens generated by some or all of the taggers that have been run, less a set of stop words. The configuration parameter keyword_tagger_ids may be set to a list of tagger IDs; if it is set, only the tokens from those tagger IDs will be indexed. If it is set to null or left unset, all taggers’ tokens will be indexed as distinct words.

class streamcorpus_pipeline._s3_storage.to_s3_chunks(config)[source]

Copies chunk files on disk to Amazon S3. The type of data written can be specified with the output_format config option. The following values are legal: streamitem and featurecollection.

N.B. The format is only necessary for construction the output_name. The format is also used to pick between an fc (for feature collections) extension and a sc (for stream items) extension.

The following configuration options are mandatory: bucket is the s3 bucket to use. aws_access_key_id_path and aws_secret_access_key_path should point to files containing your s3 credentials.

output_name is also required and is expanded in the same way as the to_local_chunks writer. The filename always has .{sc,fc}.{xz,gz,sz} appended to it (depending on the output format specified), and correspondingly, the output file is always compressed. If GPG keys are provided, then .gpg is appended and the file is encrypted.

The rest of the configuration options are optional and are described in the following example:

  # Mandatory
  bucket: aws-publicdatasets
  aws_access_key_id_path: keys/aws_access_key_id
  aws_secret_access_key_path: keys/aws_secret_access_key
  output_name: "%(date_hour)s/%(source)s-%(num)d-%(input_fname)s-%(md5)s"

  # Optional parameters.

  # The number of times to try writing to s3. A value of
  # `1` means the upload is tried exactly once.
  # The default value is `10`.
  # (This also applies to the verification step.)
  tries: 1

  # When set, the file uploaded will be private.
  # Default: false
  is_private: false

  # When set, the file will be re-downloaded from Amazon, decrypted,
  # decompressed and checksummed against the data sent to Amazon.
  # (This used to be "verify_via_http", but this more broadly named
  # option applies even when "is_private" is true.)
  # Default: true
  verify: true

  # If verification fails `tries` times, then the default
  # behavior is to exit, which can cause a rejester
  # fork_worker parent to retry the whole job.
  # Default: false
  suppress_failures: false

  # This is prepended to every key given. Namely, all s3 URLs
  # are formed by `s3://{bucket}/{s3_path_prefix}/{input-key-name}`.
  # By default, it is empty.
  s3_path_prefix: some/s3/prefix

  # Paths to GPG keys. Note that if you're using verification,
  # then a decryption key must be given.
  # Default values: None
  gpg_encryption_key_path: keys/gpg-key.pub
  gpg_decryption_key_path: keys/gpg-key.private

  # GPG recipient.
  # Default value: trec-kba
  gpg_recipient: trec-kba

  # Removes the intermediate chunk file from disk.
  # Default: true.
  cleanup_tmp_files: true

  # A temporary directory where intermediate files may reside.
  # Uses your system's default tmp directory (usually `/tmp`)
  # by default.
  tmp_dir_path: /tmp

  # Change compression scheme; default is .xz for greatest
  # compression of archival content (S3 charges by the byte).
  # xz is also the slowest, so other options can make more
  # sense in some applications.  Snappy (.sz) is the fastest
  # and still much better than no compression at all.  Choices
  # are "xz", "sz", "gz", ""
  compression: xz
class streamcorpus_pipeline._s3_storage.to_s3_tarballs(config)[source]

The same as streamcorpus_pipeline._s3_storage.to_s3_chunks, except it puts stream items into a gzipped tarball instead of chunks. This writer does not do any encryption.

In addition to the required parameters for to_s3_chunks, this writer has three more required parameters: tarinfo_name (which supports output_name substitution semantics), tarinfo_uname and tarinfo_gname.

3.4.5. API

All stage objects are constructed with a single parameter, the dictionary of configuration specific to the stage. The stage objects can be passed directly to the streamcorpus_pipeline.Pipeline constructor.

class streamcorpus_pipeline.stages.StageRegistry(*args, **kwargs)[source]

Bases: _abcoll.MutableMapping

List of known stages for a pipeline.

This is a dictionary, and so registry[stagename] will retrieve the class object or constructor function for the stage. Each stage in the dictionary is a callable of a single parameter, the configuration dictionary, which returns a callable appropriate for its stage type. Typical patterns are actual classes and wrapper functions:

class a_stage(streamcorpus_pipeline.stages.Configurable):
    config_name = 'a_stage'
    def __init__(self, config, *args, **kwargs):
        super(AStage, self).__init__(config, *args, **kwargs)
        self.param = self.config.get('param')
    def __call__(self, si, context):
        logger.debug('param is %s', self.param)
        return si

def b_stage(config):
    param = config.get('param')
    def do_stage(si, context):
       logger.debug('param is %s', param)
       return si
    return do_stage

registry = StageRegistry()
registry['a_stage'] = a_stage
registry['b_stage'] = b_stage
tryload_stage(moduleName, functionName, name=None)[source]

Try to load a stage into self, ignoring errors.

If loading a module fails because of some subordinate load failure, just give a warning and move on. On success the stage is added to the stage dictionary.

  • moduleName (str) – name of the Python module
  • functionName (str) – name of the stage constructor
  • name (str) – name of the stage, defaults to functionName

Add external stages from the Python module in path.

path must be a path to a Python module source that contains a Stages dictionary, which is a map from stage name to callable.

Parameters:path (str) – path to the module file

Add external stages from the Python module mod.

If mod is a string, then it will be interpreted as the name of a module; otherwise it is an actual module object. The module should exist somewhere in sys.path. The module must contain a Stages dictionary, which is a map from stage name to callable.

Parameters:mod – name of the module or the module itself
Raises exceptions.ImportError:
 if mod cannot be loaded or does not contain Stages
init_stage(name, config)[source]

Construct and configure a stage from known stages.

name must be the name of one of the stages in this. config is the configuration dictionary of the containing object, and its name member will be passed into the stage constructor.

  • name (str) – name of the stage
  • config (dict) – parent object configuration

callable stage

Raises exceptions.KeyError:

if name is not a known stage

class streamcorpus_pipeline.stages.PipelineStages(*args, **kwargs)[source]

Bases: streamcorpus_pipeline.stages.StageRegistry

The list of stages, preloaded with streamcorpus_pipeline stages.

class streamcorpus_pipeline.stages.Configured(config, *args, **kwargs)[source]

Bases: object

Any object containing a configuration.

The configuration is passed to the constructor.


The configuration dictionary for this object.

class streamcorpus_pipeline.stages.BatchTransform(config, *args, **kwargs)[source]

Bases: streamcorpus_pipeline.stages.Configured

Transform that acts on an entire streamcorpus.Chunk file.


Process a chunk file at a specific path.

Implementations should work in place, writing results out to the same chunk_path. The pipeline guarantees that this stage will have a key tmp_dir_path in its configuration, and this can be used to store an intermediate file which is then renamed over chunk_path.


Do an orderly shutdown of this stage.

If the stage spawns a child process, kill it, and do any other required cleanup. streamcorpus_pipeline.Pipeline._cleanup() will call this method on every batch transform stage, regardless of whether this is the currently running stage or not.

class streamcorpus_pipeline.stages.IncrementalTransform(config, *args, **kwargs)[source]

Bases: streamcorpus_pipeline.stages.Configured, _abcoll.Callable

Transform that acts on individual streamcorpus StreamItem objects.

The pipeline can run any stage with an appropriate __call__() method. The __call__() or process_item() methods are called once per stream item, reusing the same transform object.

process_item(stream_item, context)[source]

Process a single streamcorpus.StreamItem object.

context is shared across all stages. It is guaranteed to contain i_str nominally containing the original input file name and data nominally containing the auxiliary data from the work item.

This function may modify stream_item in place, construct a new item and return it, or return None to drop the item.


same or different stream item or None


Do an orderly shutdown of this stage.

Incremental transforms are generally simple Python code and a complicated shutdown is not required. The pipeline does not call this method.

3.5. rejester integration

Support for running pipeline tasks under rejester.

Since running pipeline stages can be expensive, particularly if busy third-party stages are run, it can be desirable to run the pipeline under the rejester distributed-computing framework. This module provides rejester “run” and “terminate” functions that run the pipeline. A typical rejester work spec will look like:

work_spec = {
    'name': 'pipeline',
    'desc': 'run streamcorpus_pipeline',
    'min_gb': 8,
    'config': yakonfig.get_global_config(),
    'module': 'streamcorpus_pipeline._rejester',
    'run_function': 'rejester_run_function',
    'terminate_function': 'rejester_terminate_function',

Work units have an input filename or other description as a key, and any non-empty dictionary as their data. They are processed using streamcorpus_pipeline.Pipeline._process_task(). Pass the work spec and work unit dictionaries to rejester.TaskMaster.update_bundle() to inject the work definitions.

Note that this framework makes no effort to actually distribute the data around. Either only run rejester jobs on one system, or ensure that all systems running rejester workers have a shared filesystem such as NFS, or use network-based reader and writer stages such as from_s3_chunks and to_kvlayer.