Source code for streamcorpus_pipeline._hyperlink_labels

#!/usr/bin/env python
'''
Pipeline stage for extracting hyperlinks to particular domains as
labels generated by the author.

This software is released under an MIT/X11 open source license.

Copyright 2012-2014 Diffeo, Inc.
'''
from __future__ import absolute_import
import re
import sys
import logging

from streamcorpus import add_annotation, Offset, OffsetType, Annotator, Target, Label
from streamcorpus_pipeline._clean_visible import make_clean_visible
from streamcorpus_pipeline.stages import Configured

logger = logging.getLogger(__name__)

anchors_re = re.compile('''(?P<before>(.|\n)*?)''' + \
                        '''(?P<ahref>\<a\s+(.|\n)*?href''' + \
                        '''(?P<preequals>(\s|\n)*)=(?P<postequals>(\s|\n)*)''' + \
                        '''(?P<quote>("|')?)(?P<href>[^"]*)(?P=quote)''' + \
                        '''(?P<posthref>(.|\n)*?)\>)''' + \
                        '''(?P<anchor>(.|\n)*)''', re.I)

def read_to( idx_bytes, stop_bytes=None, run_bytes=None ):
    '''
    iterates through idx_bytes until a byte in stop_bytes or a byte
    not in run_bytes.

    :rtype (int, string): idx of last byte and all of bytes including
    the terminal byte from stop_bytes or not in run_bytes
    '''
    idx = None
    vals = []
    next_b = None
    while 1:
        try:
            idx, next_b = idx_bytes.next()
        except StopIteration:
            ## maybe something going wrong?
            idx = None
            next_b = None
            break
        ## stop when we see any byte in stop_bytes
        if stop_bytes and next_b in stop_bytes:
            break
        ## stop when we see any byte not in run_bytes
        if run_bytes and next_b not in run_bytes:
            break
        ## assemble the ret_val
        vals.append( next_b )

    ## return whatever we have assembled
    return idx, b''.join(vals), next_b


def iter_attrs( idx_bytes ):
    '''
    called when idx_chars is just past "<a " inside an HTML anchor tag
    
    generates tuple(end_idx, attr_name, attr_value)
    '''
    ## read to the end of the "A" tag
    while 1:
        idx, attr_name, next_b = read_to(idx_bytes, ['=', '>'])
        attr_vals = []

        ## stop if we hit the end of the tag, or end of idx_bytes
        if next_b is None or next_b == '>':
            return
        
        idx, space, quote = read_to(idx_bytes, run_bytes = [' ', '\t', '\n', '\r'])
        if quote not in ['"', "'"]:
            ## caught start of the property value
            attr_vals = [quote]
            quote = ' '
        
        idx, attr_val, next_b = read_to(idx_bytes, [quote, '>'])
        ## next_b had better either balance the start quote, end the
        ## tag, or end idx_bytes
        assert next_b in [quote, '>', None], attr_val
        attr_vals.append( attr_val )

        yield idx, attr_name.strip(), b''.join(attr_vals).strip()        



if __name__ == '__main__':
    clean_html = sys.stdin.read()
    for m in anchors_re.finditer(clean_html):
        print m.group('href'), m.group('anchor')