7. rejester — Redis-based distributed work manager

Redis-based distributed work manager.

Rejester is a distributed work manager, using the Redis in-memory database as shared state. In typical use, some number of systems run rejester worker processes, which query the central database to find new work; anything that can reach the database can submit jobs. The standard rejester work system can also directly deliver work units to programs without using the dedicated worker process. For specialized applications there is also a globally atomic string-based priority queue.

To use a rejester-based application:

  1. Create a YAML configuration file as described below.
  2. Run rejester mode run once on any system to enable rejester.
  3. Run rejester_worker on some number of systems with identical copies of the configuration file. These may be anywhere, but must be able to reach the Redis server. Each will start as many workers as the system has CPU cores.
  4. Run some tool that generates rejester work units. For instance, the streamcorpus_directory tool included in streamcorpus-pipeline generates work units to process text files.
  5. Run rejester summary and other sub-commands of the rejester tool to get information about the job’s progress.
  6. Run rejester mode terminate to instruct all of the workers to shut down.

7.1. rejester tool

Command-line rejester management tool.

This provides a centralized user interface to explore and affect various parts of the rejester system. Of note, this is the principal way to start a worker process to run jobs. Typical use is:

  1. Set up a configuration file containing rejester configuration, such as:

    rejester:
      app_name: rejester
      namespace: mynamespace
      registry_addresses: [ "redis.example.com:6379" ]
    
  2. Run some process to generate rejester tasks.

  3. Copy the configuration files to other systems as required, and run rejester_worker -c config.yaml to start workers.

  4. Run (anywhere, but only once) rejester -c config.yaml mode run to actually start execution.

  5. Wait for the work units to finish.

  6. Run (anywhere, but only once) rejester -c config.yaml mode terminate to ask the workers to shut down.

In the configuration, namespace and registry_addresses are required unless passed on the command line. Additionally, registry_addresses can be detected from environment variables REDIS_PORT_6379_TCP_ADDR and REDIS_PORT_6379_TCP_PORT, which will be set by Docker if a rejester program is run in a container that is --link connected to another container with the name redis.

The rejester tool supports the standard --config, --dump-config, --verbose, --quiet, and --debug options. This, and any other tool that integrates with rejester, supports the following additional options:

--app-name <name>

Provide the application name for database access. This is combined with the namespace string.

--namespace <name>

Provide the namespace name. This is qualified by the application name. All work units and workers are associated with a single namespace in a single application.

--registry-address <host:port>

Provide the location of a Redis server.

If no further options are given, start an interactive shell to monitor and control rejester. Alternatively, a single command can be given on the command line. The tool provides the following commands:

summary

Print a tabular listing of all of the running work specs and how many work units are in each state.

load --work-spec file.yaml --work-units file2.json

Loads a set of work units. The work spec (-w) and work units (-u) must both be provided as external files. The work spec file is the YAML serialization of a work spec definition; see rejester.TaskMaster for details of what this looks like. The work unit file is a series of JSON records, one to a line, each of which is a dictionary of a single {"key": {"unit": "definition", "dictionary": "values"}}.

delete

Deletes the entire namespace. Prompts for confirmation, unless -y or --yes is given as an argument.

work_specs

Prints out the names of all of the work specs.

work_spec --work-spec-name name

Prints out the definition of a work spec, assuming it has already been loaded. The work spec name may be given with a --work-spec-name or -W option; or, a --work-spec or -w option may name a work spec file compatible with the load command.

status --work-spec-name name

Prints out a summary of the jobs in some work spec. Provide the work spec name the same way as for the work_spec command.

work_units --work-spec-name name [--status status]

Prints out a listing of the work units that have not yet completed for some work spec. Provide the work spec name the same way as for the work_spec command. This includes the work units that the status command would report as “available” or “pending”, but not other statuses. If -s or --status is given with one of the status strings “available”, “pending”, “blocked”, “failed”, or “finished”, only print work units with that status. If --details is given as an argument, print the definition of the work unit (and the traceback for failed work units) along with its name.

failed --work-spec-name name

Identical to work_units --status failed. May be removed at a future time.

work_unit --work-spec-name name unitname

Prints out basic details for a work unit, in any state.

retry --work-spec-name name [--all|unitname...]

Retry failed work units, removing their traceback and moving them back to “available” status. If -a or --all is given, retry all failed work units; otherwise, only retry the specific work units named on the command line.

clear --work-spec-name name [--status status] [unitname...]

Remove work units from the system. With no additional arguments, remove all work units from the specified work spec. If -s or --status is given, remove only work units with this status; see the description of the work_units subcommand for possible values. If any unitname values are given, only remove those specific work units, provided they in fact have the specified status.

mode [idle|run|terminate]

With no arguments, print out the current rejester mode; otherwise set it. In “run” mode, workers will start new jobs as they become available. In “idle” mode, workers will not start new jobs but also will continue to execute; if the mode is switched back to “run” they will start running jobs again. In “terminate” mode, workers will stop execution as soon as they finish their running jobs.

run_one

Get a single task, and run it.

global_lock [--purge]

The system maintains a global lock to make sequences of database requests atomic, but if workers fail, the global lock can be leaked. With no arguments, prints the worker ID that owns the global lock and details that are known about it. If -p or --purge is given, clear out the lock if anybody holds it.

workers

List all of the known workers. With --all include workers that haven’t checked in recently. With --details include all known details.

7.2. rejester_worker tool

Command-line rejester_worker tool for launching the worker daemon.

--pidfile /path/to/file.pid

If --pidfile is specified, the process ID of the worker is written to the named file, which must be an absolute path.

--logpath /path/to/file.log
If --logpath is specified, log messages from the worker will be written to the specified file, which again must be an absolute path; this is in addition to any logging specified in the configuration file.

Start a worker as a background task. The worker may be shut down by globally switching to mode terminate, or by kill $(cat /path/to/file.pid).

7.3. Configuration

Rejester uses yakonfig for its configuration. The relevant section of the configuration file looks like:

rejester:
  # These two options are required
  registry_addresses: [ "redis.example.com:6379" ]
  namespace: rejester

  # The following are optional, defaults shown
  app_name: rejester
  default_lifetime: 900
  enough_memory: false
  worker: fork_worker

registry_addresses indicates the location of the Redis server. While this is a list, only the first value is used. Also note that YAML syntax requires quoting the host:port string, lest it be interpreted as a dictionary.

app_name and namespace identify the specific application in use. Multiple applications can share the same Redis server so long as they have distinct namespace strings. app_name is currently fixed at rejester and setting a different value has no effect.

default_lifetime indicates how long a job is allowed to run before it must call rejester._task_master.WorkUnit.update(). If a job runs beyond this timeout it will return from the “pending” list to the “available” list, and another worker may start working on it.

When a worker requests a job, if the system does not have enough memory to satisfy a given work spec’s min_gb request, that work spec is skipped. Setting enough_memory to true overrides this check.

worker specifies the worker implementation to use in rejester_worker. Valid options are fork_worker or multi_worker. fork_worker has additional configuration options; see ForkWorker for details. multi_worker is less stable and never allows jobs to time out, but will start a set of jobs more quickly.

7.4. Task system

7.4.1. Core API

class rejester.TaskMaster(config)[source]

Bases: object

Control object for the rejester task queue.

The task queue consists of a series of work specs, which include configuration information, and each work spec has some number of work units attached to it. Both the work specs and work units are defined as (non-empty) dictionaries. The work spec must have keys name and min_gb, but any other properties are permitted. Conventionally desc contains a description of the job and config contains the top-level global configuration.

There are three ways to use TaskMaster:

  1. Create work specs and work units with update_bundle(). Directly call get_work() to get work units back. Based on the information stored in the work spec and work unit dictionaries, do the work manually, and call WorkUnit.finish() or WorkUnit.fail() as appropriate.
  2. Create work specs and work units with update_bundle(). The work spec must contain three keys, module naming a Python module, and run_function and terminate_function each naming functions of a single parameter in that module. Directly call get_work() to get work units back, then call their WorkUnit.run() function to execute them. See the basic example in WorkUnit.fail().
  3. Create work specs and work units with update_bundle(), including the Python information. Use one of the standard worker implementations in rejester.workers to actually run the job.

Most applications will use the third option, the standard worker system. In all three cases populating and executing jobs can happen on different systems, or on multiple systems in parallel. To use the standard worker system, create a Python module:

def rejester_run(work_unit):
    # Does the actual work for `work_unit`.
    # Must be a top-level function in the module.
    # The work unit will succeed if this returns normally,
    # and will fail if this raises an exception.
    pass

def rejester_terminate(work_unit):
    # Called only if a signal terminates the worker.
    # This usually does nothing, but could kill a known subprocess.
    pass

The work spec would look something like:

work_spec = {
    'name': 'rejester_sample',
    'desc': 'A sample rejester job.',
    'min_gb': 1,
    'config': yakonfig.get_global_config(),
    'module': 'name.of.the.module.from.above',
    'run_function': 'rejester_run',
    'terminate_function': 'rejester_terminate',
}

The work units can be any non-empty dictionaries that are meaningful to the run function. WorkUnit objects are created with their key and data fields set to individual keys and values from the parameter to update_bundle().

A work unit can be in one of five states. It is available if it has been added to the queue but nobody is working on it. It is pending if somebody is currently working on it. When they finish, it will become either finished or failed. If dependencies are added between tasks using add_dependent_task(), a task can also be blocked.

The general flow for a rejester application is to set_mode() to IDLE, then add work units using update_bundle(), and set_mode() to RUN. get_work() will return work units until all have been consumed. set_mode() to TERMINATE will instruct workers to shut down.

This object keeps very little state locally and can safely be used concurrently, including from multiple systems. Correspondingly, any settings here, including set_mode(), are persistent even beyond the end of the current process.

__init__(config)[source]

Create a new task master.

This is a lightweight object, and it is safe to have multiple objects concurrently accessing the same rejester system, even on multiple machines.

Parameters:config (dict) – Configuration for the task master, generally the contents of the rejester block in the global configuration
config = None

Configuration for the task master

registry = None

Mid-level Redis interface

worker_id = None

Worker ID, if this is tied to a worker

default_lifetime = None

Amount of time workers get between WorkUnit.update() calls

enough_memory = None

Override available-memory checks

RUN = 'RUN'

Mode constant instructing workers to do work

IDLE = 'IDLE'

Mode constant instructing workers to not start new work

TERMINATE = 'TERMINATE'

Mode constant instructing workers to shut down

AVAILABLE = 1
BLOCKED = 2
PENDING = 3
FINISHED = 4
FAILED = 5
set_mode(mode)[source]

Set the global mode of the rejester system.

This must be one of the constants TERMINATE, RUN, or IDLE. TERMINATE instructs any running workers to do an orderly shutdown, completing current jobs then exiting. IDLE instructs workers to stay running but not start new jobs. RUN tells workers to do actual work.

Parameters:mode (str) – new rejester mode
Raises rejester.exceptions.ProgrammerError:
 on invalid mode
get_mode()[source]

Get the global mode of the rejester system.

Returns:rejester mode, IDLE if unset
idle_all_workers()[source]

Set the global mode to IDLE and wait for workers to stop.

This can wait arbitrarily long before returning. The worst case in “normal” usage involves waiting five minutes for a “lost” job to expire; a well-behaved but very-long-running job can extend its own lease further, and this function will not return until that job finishes (if ever).

Deprecated since version 0.4.5: There isn’t an obvious use case for this function, and its “maybe wait forever for something out of my control” nature makes it hard to use in real code. Polling all of the work specs and their num_pending() in application code if you really needed this operation would have the same semantics and database load.

mode_counts()[source]

Get the number of workers in each mode.

This returns a dictionary where the keys are mode constants and the values are a simple integer count of the number of workers in that mode.

workers(alive=True)[source]

Get a listing of all workers.

This returns a dictionary mapping worker ID to the mode constant for their last observed mode.

Parameters:alive (bool) – if true (default), only include workers that have called Worker.heartbeat() sufficiently recently
get_heartbeat(worker_id)[source]

Get the last known state of some worker.

If the worker never existed, or the worker’s lifetime has passed without it heartbeating, this will return an empty dictionary.

Parameters:worker_id (str) – worker ID
Returns:dictionary of worker state, or empty dictionary
See:Worker.heartbeat()
dump()[source]

Print the entire contents of this to debug log messages.

This is really only intended for debugging. It could produce a lot of data.

classmethod validate_work_spec(work_spec)[source]

Check that work_spec is valid.

It must at the very minimum contain a name and min_gb.

Raises rejester.exceptions.ProgrammerError:
 if it isn’t valid
num_available(work_spec_name)[source]

Get the number of available work units for some work spec.

These are work units that could be returned by get_work(): they are not complete, not currently executing, and not blocked on some other work unit.

num_pending(work_spec_name)[source]

Get the number of pending work units for some work spec.

These are work units that some worker is currently working on (hopefully; it could include work units assigned to workers that died and that have not yet expired).

num_blocked(work_spec_name)[source]

Get the number of blocked work units for some work spec.

These are work units that are the first parameter to add_dependent_work_units() where the job they depend on has not yet completed.

num_finished(work_spec_name)[source]

Get the number of finished work units for some work spec.

These have completed successfully with WorkUnit.finish().

num_failed(work_spec_name)[source]

Get the number of failed work units for some work spec.

These have completed unsuccessfully with WorkUnit.fail().

num_tasks(work_spec_name)[source]

Get the total number of work units for some work spec.

status(work_spec_name)[source]

Get a summary dictionary for some work spec.

The keys are the strings num_available(), num_pending(), num_blocked(), num_finished(), num_failed(), and num_tasks(), and the values are the values returned from those functions.

list_work_specs(limit=None, start=None)[source]

Get the list of [(work spec name, work spec), ...]

The keys are the work spec names; the values are the actual work spec definitions.

return [(spec name, spec), ...], next start value

iter_work_specs(limit=None, start=None)[source]

yield work spec dicts

get_work_spec(work_spec_name)[source]

Get the dictionary defining some work spec.

get_work_units(work_spec_name, work_unit_keys=None, state=None, limit=None, start=None)[source]

Get (key, value) pairs for work units.

If state is not None, then it should be one of the string state constants, and this function will return a list of pairs of work unit key and value for work units in that state. If start is not None, then this many work units are skipped; if limit is not None then at most this many work units will be returned.

If state is None then all work units in all states will be returned.

Parameters:
  • work_spec_name (str) – name of work spec to query
  • state (str) – string state constant, or None for all work units in all states
  • limit (int) – maximum number of items to return
  • start (int) – skip this many items before returning any
Returns:

list of pairs of (work unit key, work unit data)

list_work_units(work_spec_name, start=0, limit=None)[source]

Get a dictionary of work units for some work spec.

The dictionary is from work unit name to work unit definiton. Only work units that have not been completed (“available” or “pending” work units) are included.

list_available_work_units(work_spec_name, start=0, limit=None)[source]

Get a dictionary of available work units for some work spec.

The dictionary is from work unit name to work unit definiton. Only work units that have not been started, or units that were started but did not complete in a timely fashion, are included.

list_pending_work_units(work_spec_name, start=0, limit=None)[source]

Get a dictionary of in-progress work units for some work spec.

The dictionary is from work unit name to work unit definiton. Units listed here should be worked on by some worker.

list_blocked_work_units(work_spec_name, start=0, limit=None)[source]

Get a dictionary of blocked work units for some work spec.

The dictionary is from work unit name to work unit definiton. Work units included in this list are blocked because they were listed as the first work unit in add_dependent_work_units(), and the work unit(s) they depend on have not completed yet. This function does not tell why work units are blocked, it merely returns the fact that they are.

list_finished_work_units(work_spec_name, start=0, limit=None)[source]

Get a dictionary of finished work units for some work spec.

The dictionary is from work unit name to work unit definiton. Only work units that have been successfully completed are included.

list_failed_work_units(work_spec_name, start=0, limit=None)[source]

Get a dictionary of failed work units for some work spec.

The dictionary is from work unit name to work unit definiton. Only work units that have completed unsuccessfully are included.

del_work_units(work_spec_name, work_unit_keys=None, state=None, all=False)[source]

Delete work units from a work spec.

The parameters are considered in order as follows:

  • If all is True, then all work units in work_spec_name are deleted; otherwise
  • If state is not None, then all work units in the named state are deleted; otherwise
  • If work_unit_keys are specified, then those specific work units are deleted; otherwise
  • Nothing is deleted.
Parameters:
  • work_spec_name (str) – name of the work spec
  • work_unit_keys (list) – if not None, only delete these specific keys
  • state (str) – only delete work units in this state
  • all (bool) – if true, delete all work units
Returns:

number of work units deleted

clear()[source]

Delete everything. Deletes all work units and all work specs.

remove_available_work_units(work_spec_name, work_unit_names)[source]

Remove some work units in the available queue.

If work_unit_names is None (which must be passed explicitly), all available work units in work_spec_name are removed; otherwise only the specific named work units will be.

Parameters:
  • work_spec_name (str) – name of the work spec
  • work_unit_names (list) – names of the work units, or None for all in work_spec_name
Returns:

number of work units removed

remove_pending_work_units(work_spec_name, work_unit_names)[source]

Remove some work units in the pending list.

If work_unit_names is None (which must be passed explicitly), all pending work units in work_spec_name are removed; otherwise only the specific named work units will be.

Note that this function has the potential to confuse workers if they are actually working on the work units in question. If you have ensured that the workers are dead and you would be otherwise waiting for the leases to expire before calling remove_available_work_units(), then this is a useful shortcut.

Parameters:
  • work_spec_name (str) – name of the work spec
  • work_unit_names (list) – names of the work units, or None for all in work_spec_name
Returns:

number of work units removed

remove_blocked_work_units(work_spec_name, work_unit_names)[source]

Remove some work units in the blocked list.

If work_unit_names is None (which must be passed explicitly), all pending work units in work_spec_name are removed; otherwise only the specific named work units will be.

Note that none of the “remove” functions will restart blocked work units, so if you have called e.g. remove_available_work_units() for a predecessor job, you may need to also call this method for its successor.

Parameters:
  • work_spec_name (str) – name of the work spec
  • work_unit_names (list) – names of the work units, or None for all in work_spec_name
Returns:

number of work units removed

remove_failed_work_units(work_spec_name, work_unit_names)[source]

Remove some failed work units.

If work_unit_names is None (which must be passed explicitly), all failed work units in work_spec_name are removed; otherwise only the specific named work units will be.

Also consider retry() to move failed work units back into the available queue.

Parameters:
  • work_spec_name (str) – name of the work spec
  • work_unit_names (list) – names of the work units, or None for all in work_spec_name
Returns:

number of work units removed

remove_finished_work_units(work_spec_name, work_unit_names)[source]

Remove some finished work units.

If work_unit_names is None (which must be passed explicitly), all finished work units in work_spec_name are removed; otherwise only the specific named work units will be.

Parameters:
  • work_spec_name (str) – name of the work spec
  • work_unit_names (list) – names of the work units, or None for all in work_spec_name
Returns:

number of work units removed

get_work_unit_status(work_spec_name, work_unit_key)[source]

Get a high-level status for some work unit.

The return value is a dictionary. The only required key is status, which could be any of:

missing
The work unit does not exist anywhere
available
The work unit is available for new workers; additional keys include expiration (may be 0)
pending
The work unit is being worked on; additional keys include expiration and worker_id (usually)
blocked
The work unit is waiting for some other work units to finish; additional keys include depends_on
finished
The work unit has completed
failed
The work unit failed; additional keys include traceback
Parameters:
  • work_spec_name (str) – name of the work spec
  • work_unit_name (str) – name of the work unit
Returns:

dictionary description of summary status

inspect_work_unit(work_spec_name, work_unit_key)[source]

Get the data for some work unit.

Returns the data for that work unit, or None if it really can’t be found.

Parameters:
  • work_spec_name (str) – name of the work spec
  • work_unit_key (str) – name of the work unit
Returns:

definition of the work unit, or None

reset_all(work_spec_name)[source]

Restart a work spec.

This calls idle_all_workers(), then moves all finished jobs back into the available queue.

Deprecated since version 0.4.5: See idle_all_workers() for problems with that method. This also ignores failed jobs and work unit dependencies. In practice, whatever generated a set of work units initially can recreate them easily enough.

update_bundle(work_spec, work_units, nice=0)[source]

Load a work spec and some work units into the task list.

update the work_spec and work_units. Overwrites any existing work spec with the same work_spec['name'] and similarly overwrites any WorkUnit with the same work_unit.key

Parameters:
  • work_spec (dict) – Work spec dictionary
  • work_units (dict of dict) – Keys are used as WorkUnit.key, values are used as WorkUnit.data
  • nice (int) – Niceness of work_spec, higher value is lower priority
set_work_spec(work_spec)[source]

work_spec is a dict() work_spec[‘name’] is used as work_spec_name in other API calls work_spec[‘nice’] is used for prioritization, if set.

add_work_units(work_spec_name, work_unit_key_vals)[source]

work_unit_key_vals list of (work_unit_key, work_unit_data)

add_dependent_work_units(work_unit, depends_on, hard=True)[source]

Add work units, where one prevents execution of the other.

The two work units may be attached to different work specs, but both must be in this task master’s namespace. work_unit and depends_on are both tuples of (work spec name, work unit name, work unit dictionary). The work specs must already exist; they may be created with update_bundle() with an empty work unit dictionary. If a work unit dictionary is provided with either work unit, then this defines that work unit, and any existing definition is replaced. Either or both work unit dictionaries may be None, in which case the work unit is not created if it does not already exist. In this last case, the other work unit will be added if specified, but the dependency will not be added, and this function will return False. In all other cases, this dependency is added in addition to all existing dependencies on either or both work units, even if the work unit dictionary is replaced.

work_unit will not be executed or reported as available via get_work() until depends_on finishes execution. If the depends_on task fails, then the hard parameter describes what happens: if hard is True then work_unit will also fail, but if hard is False then work_unit will be able to execute even if depends_on fails, it just must have completed some execution attempt.

Calling this function with hard=True suggests an ordered sequence of tasks where the later task depends on the output of the earlier tasks. Calling this function with hard=False suggests a cleanup task that must run after this task (and, likely, several others) are done, but doesn’t specifically depend on its result being available.

Parameters:
  • work_unit (tuple of (str,str,dict)) – “Later” work unit to execute
  • depends_on (tuple of (str,str,dict)) – “Earlier” work unit to execute
  • hard (bool) – if True, then work_unit automatically fails if depends_on fails
Returns:

True, unless one or both of the work units didn’t exist and weren’t specified, in which case, False

Raises rejester.exceptions.NoSuchWorkSpecError:
 

if a work spec was named that doesn’t exist

retry(work_spec_name, *work_unit_names)[source]

Move failed work unit(s) back into the “pending” queue.

The work unit will be available to execute immediately. If other tasks had depended on it, those dependencies will not be recreated.

Parameters:
  • work_spec_name (str) – name of the (existing) work spec
  • work_unit_names (str) – name(s) of the (failed) work unit(s)
Raises:
  • rejester.NoSuchWorkSpecError – if work_spec_name is invalid
  • rejester.NoSuchWorkUnitError – if work_spec_name is valid but any of the work_unit_names are not a failed work unit
  • rejester.LockError – if the registry lock could not be obtained
nice(work_spec_name, nice)[source]

Change the priority of an existing work spec.

get_work(worker_id, available_gb=None, lease_time=None, work_spec_names=None, max_jobs=None)[source]

obtain a WorkUnit instance based on available memory for the worker process.

Parameters:
  • worker_id – unique identifier string for a worker to which a WorkUnit will be assigned, if available.
  • available_gb – number of gigabytes of RAM available to this worker
  • lease_time – how many seconds to lease a WorkUnit
  • max_jobs (int) – maximum number of work units to return (default 1)
  • work_spec_names – limit to queue from one work_spec. NOT IMPLEMENTD. this implementation will return work from any work spec.
get_assigned_work_unit(worker_id, work_spec_name, work_unit_key)[source]

get a specific WorkUnit that has already been assigned to a particular worker_id

get_child_work_units(worker_id)[source]

Get work units assigned to a worker’s children.

Returns a dictionary mapping worker ID to WorkUnit. If a child exists but is idle, that worker ID will map to None. The work unit may already be expired or assigned to a different worker; this will be reflected in the returned WorkUnit.

This may write back to the underlying data store to clean up stale children that have not unregistered themselves but no longer exist in any form.

worker_register(worker_id, mode=None, lifetime=6000, environment=None, parent=None)[source]
worker_heartbeat(worker_id, mode=None, lifetime=6000, environment=None, parent=None)[source]
worker_unregister(worker_id, parent=None)[source]
class rejester.WorkUnit(registry, work_spec_name, key, data, worker_id=None, expires=None, default_lifetime=900)[source]

Bases: object

A single unit of work being executed.

These are created by the rejester system; the standard worker system will pass objects of this type to the named run function. If some code calls:

task_master.update_bundle({ 'name': 'work_spec_name', ... },
                          { 'key': data, ... })

Then when the work unit is executed, this object will have the provided work_spec_name, key, and data, with remaining values being provided by the system.

In the standard worker system, the worker will call finish() or fail() as appropriate. The run function should examine spec, key, and data to figure out what to do.

__init__(registry, work_spec_name, key, data, worker_id=None, expires=None, default_lifetime=900)[source]

Create a new work unit runtime data.

In most cases application code will not need to call this directly, but should expect to be passed work units created by the system.

Parameters:
  • registry (rejester.Registry) – Mid-level Redis interface
  • work_spec_name (str) – Name of the work spec
  • key (str) – Name of the work unit
  • data (dict) – Data provided for the work unit
  • worker_id (str) – Worker doing this work unit
  • expires (int) – Latest time this work unit can still be running
  • default_lifetime (int) – Time update() adds by default
worker_id = None

Worker doing this work unit

registry = None

Mid-level Redis interface

work_spec_name = None

Name of the work spec

key = None

Name of the work unit

finished = None

Has this work unit called finish()?

failed = None

Has this work unit called fail()?

expires = None

Time (as time.time()) when this work unit must finish

default_lifetime = None

Time update() adds by default

spec

Actual work spec.

This is retrieved from the database on first use, and in some cases a worker can be mildly more efficient if it avoids using this.

module

Python module to run the job.

This is used by run() and the standard worker system. If the work spec contains keys module, run_function, and terminate_function, then this contains the Python module object named as module; otherwise this contains None.

run()[source]

Actually runs the work unit.

This is called by the standard worker system, generally once per work unit. It requires the work spec to contain keys module, run_function, and terminate_function. It looks up run_function in module and calls that function with self as its only parameter.

terminate()[source]

Kills the work unit.

This is called by the standard worker system, but only in response to an operating system signal. If the job does setup such as creating a child process, its terminate function should kill that child process. More specifically, this function requires the work spec to contain the keys module, run_function, and terminate_function, and calls terminate_function in module containing self as its only parameter.

update(lease_time=None)[source]

Refresh this task’s expiration time.

This tries to set the task’s expiration time to the current time, plus lease_time seconds. It requires the job to not already be complete. If lease_time is negative, makes the job immediately be available for other workers to run.

Parameters:lease_time (int) – time to extend job lease beyond now
Raises rejester.exceptions.LostLease:
 if the lease has already expired
finish()[source]

Move this work unit to a finished state.

In the standard worker system, the worker calls this on the job’s behalf when run_function() returns successfully.

Raises rejester.exceptions.LostLease:
 if the lease has already expired
fail(exc=None)[source]

Move this work unit to a failed state.

In the standard worker system, the worker calls this on the job’s behalf when run_function() ends with any exception:

try:
    work_unit.run()
    work_unit.finish()
except Exception, e:
    work_unit.fail(e)

A traceback property is recorded with a formatted version of exc, if any.

Parameters:exc – Exception that caused the failure, or None
Raises rejester.exceptions.LostLease:
 if the lease has already expired

7.4.2. Workers

class rejester.Worker(config, task_master=None)[source]

Bases: object

Process that runs rejester jobs.

Running a worker involves three steps: calling register() to get a worker_id and record our presence in the data store; calling run() to actually do work; and calling unregister() on clean exit. The run() method should periodically call heartbeat() to update its state and get the current run mode.

..automethod:: __init__

config = None

Configuration for the worker

task_master = None

Task interface to talk to the data store

worker_id = None

Worker ID, only valid after register()

parent = None

Parent worker ID

lifetime = None

Required maximum time between heartbeat()

environment()[source]

Get raw data about this worker.

This is recorded in the heartbeat() info, and can be retrieved by TaskMaster.get_heartbeat(). The dictionary includes keys worker_id, host, fqdn, version, working_set, and memory.

register(parent=None)[source]

Record the availability of this worker and get a unique identifer.

This sets worker_id and calls heartbeat(). This cannot be called multiple times without calling unregister() in between.

unregister()[source]

Remove this worker from the list of available workers.

This requires the worker to already have been register().

heartbeat()[source]

Record the current worker state in the registry.

This records the worker’s current mode, plus the contents of environment(), in the data store for inspection by others.

Returns mode:Current mode, as TaskMaster.get_mode()
run()[source]

Run some number of jobs.

register() must have already been called. This is expected to get jobs using TaskMaster.get_work() with this worker’s worker_id. Depending on the semantics of the actual implementing class this may run one job, run jobs as long as the worker’s mode is RUN, or any other combination.

Rejester workers.

The standard worker infrastructure in the classes below calls rejester.WorkUnit.run() on individual work units as they become available. In normal use, a caller will use rejester.TaskMaster.update_bundle() to submit jobs, then expect an external caller to run rejester run_worker, which will create a MultiWorker object that runs those jobs.

Other implementation strategies are definitely possible. The SingleWorker class here will run exactly one job when invoked. It is also possible for a program that intends to do some work, possibly even in parallel, but wants to depend on rejester for queueing, to call rejester.TaskMaster.get_work() itself and do work based on whatever information is in the work spec; that would not use this worker infrastructure at all.

class rejester.workers.SingleWorker(config, task_master=None, work_spec_names=None, max_jobs=1)[source]

Bases: rejester._task_master.Worker

Worker that runs exactly one job when called.

This is used by the rejester.run.Manager.do_run_one() command to run a single job; that just calls run(). This is also invoked as the child process by ForkWorker, which calls as_child().

run(set_title=False)[source]

Do some work.

The standard implementation here calls run_one().

Parameters:set_title – if true, set the process’s title with the work unit name
Returns:True if there was a job (even if it failed)
run_one(set_title=False)[source]

Get exactly one job, run it, and return.

Does nothing (but returns False) if there is no work to do. Ignores the global mode; this will do work even if rejester.TaskMaster.get_mode() returns TERMINATE.

Parameters:set_title – if true, set the process’s title with the work unit name
Returns:True if there was a job (even if it failed)
EXIT_SUCCESS = 0

Exit code from as_child() if it ran a work unit (maybe unsuccessfully).

EXIT_EXCEPTION = 1

Exit code from as_child() if there was a failure getting the work unit.

EXIT_BORED = 2

Exit code from as_child() if there was no work to do.

classmethod as_child(global_config, parent=None)[source]

Run a single job in a child process.

This method never returns; it always calls sys.exit() with an error code that says what it did.

class rejester.workers.MultiWorker(config)[source]

Bases: rejester._task_master.Worker

Parent worker that runs multiple jobs continuously.

This uses multiprocessing to run one child HeadlessWorker per core on the system, and averages system memory to report available_gb. This class manages the TaskMaster interactions and sends WorkUnit instances to its managed child processes.

This class is normally invoked from the command line by running rejester run_worker, which runs this class as a daemon process.

Instances of this class, running across many machines in a cluster, are controlled by rejester.TaskMaster.get_mode(). The run() method will exit if the current mode is TERMINATE. If the mode is IDLE then the worker will stay running but will not start new jobs. New jobs will be started only when the mode becomes RUN. The system defaults to IDLE state, but if workers exit immediately, it may be because the mode has been left at TERMINATE from a previous execution.

If tasks_per_cpu is set in the configuration block for rejester, then that many child process will be launched for each CPU on the machine.

run()[source]

Fetch and dispatch jobs as long as the system is running.

This periodically checks the rejester.TaskMaster mode and asks it for more work. It will normally run forever in a loop until the mode becomes TERMINATE, at which point it waits for all outstanding jobs to finish and exits.

This will heartbeat() and check for new work whenever a job finishes, or otherwise on a random interval between 1 and 5 seconds.

class rejester.workers.HeadlessWorker(config, worker_id, work_spec_name, work_unit_key)[source]

Bases: rejester._task_master.Worker

Child worker to do work under multiprocessing.

The run() method expects to run a single WorkUnit, which it will receive from its parent MultiWorker. This class expects to be the only thing run in a multiprocessing child process.

class rejester.workers.ForkWorker(config)[source]

Bases: rejester._task_master.Worker

Parent worker that runs multiple jobs concurrently.

This manages a series of child processes, each of which runs a SingleWorker. It runs as long as the global rejester state is not rejester.TaskMaster.TERMINATE.

This takes some additional optional configuration options. A typical configuration will look like:

rejester:
  # required rejester configuration
  registry_addresses: [ 'redis.example.com:6379' ]
  app_name: rejester
  namespace: namespace

  # indicate which worker to use
  worker: fork_worker
  fork_worker:
    # set this or num_workers; num_workers takes precedence
    num_workers_per_core: 1
    # how often to check if there is more work
    poll_interval: 1
    # how often to start more workers
    spawn_interval: 0.01
    # how often to record our existence
    heartbeat_interval: 15
    # minimum time a working worker will live
    child_lifetime: 10
    # kill off jobs this long before their deadlines
    stop_jobs_early: 15

This spawns child processes to do work. Each child process does at most one work unit. If num_workers is set, at most this many concurrent workers will be running at a time. If num_workers is not set but num_workers_per_core is, the maximum number of workers is a multiple of the number of processor cores available. The default setting is 1 worker per core, but setting this higher can be beneficial if jobs are alternately network- and CPU-bound.

The parent worker runs a fairly simple state machine. It awakens on startup, whenever a child process exits, or after a timeout. When it awakens, it checks on the status of all of its children, and collects the exit status of those that have finished. If any failed or reported no more work, the timeout is set to poll_interval, and no more workers are started until that timeout has passed. Otherwise, if it is not running the maximum number of workers, it starts one exactly and sets the timeout to spawn_interval.

This means that if the system is operating normally, and there is work to do, then it will start all of its workers in num_workers times spawn_interval time. If spawn_interval is 0, then any time the system thinks it may have work to do, it will spawn the maximum number of processes immediately, each of which will connect to Redis. If the system runs out of work, or if it starts all of its workers, it will check for work or system shutdown every poll_interval. The parent worker will contact Redis, recording its state and retrieving the global mode, every heartbeat_interval.

Every heartbeat_interval the parent also checks on the jobs its children are running. If any of them are overdue now or being worked on by other workers, the parent will kill them to avoid having multiple workers doing the same work unit. Furthermore, if any childrens’ jobs will expire within stop_jobs_early seconds, those jobs will be killed too even if they aren’t expired yet, and any jobs killed this way will be marked failed if they are still owned by the same child worker. If stop_jobs_early is at least heartbeat_interval, this will reliably cause jobs that take longer than the expiry interval (default 300 seconds) to be killed off rather than retried.

set_signal_handlers()[source]

Set some signal handlers.

These react reasonably to shutdown requests, and keep the logging child alive.

clear_signal_handlers()[source]

Undo set_signal_handlers().

Not only must this be done on shutdown, but after every fork call too.

log(level, message)[source]

Write a log message via the child process.

The child process must already exist; call live_log_child() to make sure. If it has died in a way we don’t expect then this will raise signal.SIGPIPE.

debug(group, message)[source]

Maybe write a debug-level log message.

In particular, this gets written if the hidden debug_worker option contains group.

log_spewer(gconfig, fd)[source]

Child process to manage logging.

This reads pairs of lines from fd, which are alternating priority (Python integer) and message (unformatted string).

start_log_child()[source]

Start the logging child process.

stop_log_child()[source]

Stop the logging child process.

live_log_child()[source]

Start the logging child process if it died.

do_some_work(can_start_more)[source]

Run one cycle of the main loop.

If the log child has died, restart it. If any of the worker children have died, collect their status codes and remove them from the child set. If there is a worker slot available, start exactly one child.

Parameters:can_start_more (bool) – Allowed to start a child?
Returns:Time to wait before calling this function again
check_spinning_children()[source]

Stop children that are working on overdue jobs.

stop_gracefully()[source]

Refuse to start more processes.

This runs in response to SIGINT or SIGTERM; if this isn’t a background process, control-C and a normal kill command cause this.

stop_all_children()[source]

Kill all workers.

scram()[source]

Kill all workers and die ourselves.

This runs in response to SIGABRT, from a specific invocation of the kill command. It also runs if stop_gracefully() is called more than once.

run()[source]

Run the main loop.

This is fairly invasive: it sets a bunch of signal handlers and spawns off a bunch of child processes.

rejester.workers.run_worker(worker_class, *args, **kwargs)[source]

Bridge function to run a worker under multiprocessing.

The multiprocessing module cannot apply_async() to a class constructor, even if the __init__ calls .run(), so this simple wrapper calls worker_class(*args, **kwargs) and logs any exceptions before re-raising them.

This is usually only used to create a HeadlessWorker, but it does run through the complete register(), run(), unregister() sequence with some logging on worker-level failures.

7.4.3. Implementation Details

class rejester.Registry(config)[source]

Bases: rejester._redis.RedisBase

Store string-keyed dictionaries in Redis.

Provides a centralized storage mechanism for dictionaries, including atomic operations for moving (key, value) pairs between dictionaries, and incrementing counts.

Many operations on the registry require getting a lock via the database, for instance

>>> with registry.lock() as session:
...   value = session.get(k1, k2)

The lock mechanism ensures that no two Registry objects do work concurrently, even running on separate systems. Specific method descriptions will note if they can run without a lock. In general, read-only operations will always run successfully without a lock but will check for the correct lock if one is given; certain very simple operations do no lock checking at all; and read-write operations always require a lock.

The basic data object is a string-keyed dictionary stored under some key. The dictionary keys are also stored in a prioritized list. This in effect provides two levels of dictionary, using the Redis key and the dictionary key. The registry makes an effort to store all types of object as values, serializing them into CBOR.

re_acquire_lock(ltime=5)[source]

Re-acquire the lock.

You must already own the lock; this is best called from within a lock() block.

Parameters:ltime (int) – maximum time (in seconds) to own lock
Returns:the session lock identifier
Raises rejester.exceptions.EnvironmentError:
 if we didn’t already own the lock
lock(*args, **kwds)[source]

Context manager to acquire the namespace global lock.

This is typically used for multi-step registry operations, such as a read-modify-write sequence:

with registry.lock() as session:
    d = session.get('dict', 'key')
    del d['traceback']
    session.set('dict', 'key', d)

Callers may provide their own identifier; if they do, they must ensure that it is reasonably unique (e.g., a UUID). Using a stored worker ID that is traceable back to the lock holder is a good practice.

Parameters:
  • atime (int) – maximum time (in seconds) to acquire lock
  • ltime (int) – maximum time (in seconds) to own lock
  • identifier (str) – worker-unique identifier for the lock
read_lock()[source]

Find out who currently owns the namespace global lock.

This is purely a diagnostic tool. If you are trying to get the global lock, it is better to just call lock(), which will atomically get the lock if possible and retry.

Returns:session identifier of the lock holder, or None
force_clear_lock()[source]

Kick out whoever currently owns the namespace global lock.

This is intended as purely a last-resort tool. If another process has managed to get the global lock for a very long time, or if it requested the lock with a long expiration and then crashed, this can make the system functional again. If the original lock holder is still alive, its session calls may fail with exceptions.

update(dict_name, mapping=None, priorities=None, expire=None, locks=None)[source]

Add mapping to a dictionary, replacing previous values

Can be called with only dict_name and expire to refresh the expiration time.

NB: locks are only enforced if present, so nothing prevents another caller from coming in an modifying data without using locks.

Parameters:
  • mapping – a dict of keys and values to update in dict_name. Must be specified if priorities is specified.
  • priorities – a dict with the same keys as those in mapping that provides a numerical value indicating the priority to assign to that key. Default sets 0 for all keys.
  • expire (int) – if specified, then dict_name will be set to expire in that many seconds.
  • locks – a dict with the same keys as those in the mapping. Before making any particular update, this function checks if a key is present in a ‘locks’ table for this dict, and if so, then its value must match the value provided in the input locks dict for that key. If not, then the value provided in the locks dict is inserted into the ‘locks’ table. If the locks parameter is None, then no lock checking is performed.
reset_priorities(dict_name, priority)[source]

set all priorities in dict_name to priority

popmany(dict_name, *keys)[source]

Remove one or more keys from a dictionary.

If any of the keys are not present, they are silently ignored.

The actual deletion operation is atomic and does not require a session lock, but nothing stops another operation from creating the deleted keys immediately afterwards. You may call this with or without a session lock, but the operation will fail if some other worker holds one.

Parameters:
  • dict_name (str) – name of dictionary to modify
  • keys (str) – names of keys to remove
Returns:

number of keys removed

Raises rejester.exceptions.LockError:
 

if the session lock timed out, or if this was called without a session lock and some other worker holds one

len(dict_name, priority_min='-inf', priority_max='+inf')[source]

Get the number of items in (part of) a dictionary.

Returns number of items in dict_name within [priority_min, priority_max]. This is similar to len(filter(dict_name, priority_min, priority_max)) but does not actually retrieve the items.

This is a read-only operation that does not require or honor a session lock.

Parameters:
  • dict_name (str) – dictionary name to query
  • priority_min (float) – lowest priority score
  • priority_max (float) – highest priority score
getitem_reset(dict_name, priority_min='-inf', priority_max='+inf', new_priority=0, lock=None)[source]

Select an item and update its priority score.

The item comes from dict_name, and has the lowest score at least priority_min and at most priority_max. If some item is found, change its score to new_priority and return it.

This runs as a single atomic operation but still requires a session lock.

Parameters:
  • dict_name (str) – source dictionary
  • priority_min (float) – lowest score
  • priority_max (float) – highest score
  • new_priority (float) – new score
  • lock (str) – lock value for the item
Returns:

pair of (key, value) if an item was reprioritized, or None

popitem(dict_name, priority_min='-inf', priority_max='+inf')[source]

Select an item and remove it.

The item comes from dict_name, and has the lowest score at least priority_min and at most priority_max. If some item is found, remove it from dict_name and return it.

This runs as a single atomic operation but still requires a session lock.

Parameters:
  • dict_name (str) – source dictionary
  • priority_min (float) – lowest score
  • priority_max (float) – highest score
Returns:

pair of (key, value) if an item was popped, or None

popitem_move(from_dict, to_dict, priority_min='-inf', priority_max='+inf')[source]

Select an item and move it to another dictionary.

The item comes from from_dict, and has the lowest score at least priority_min and at most priority_max. If some item is found, remove it from from_dict, add it to to_dict, and return it.

This runs as a single atomic operation but still requires a session lock.

Parameters:
  • from_dict (str) – source dictionary
  • to_dict (str) – destination dictionary
  • priority_min (float) – lowest score
  • priority_max (float) – highest score
Returns:

pair of (key, value) if an item was moved, or None

move(from_dict, to_dict, mapping, priority=None)[source]

Move keys between dictionaries, possibly with changes.

Every key in mapping is removed from from_dict, and added to to_dict with its corresponding value. The priority will be priority, if specified, or else its current priority.

This operation on its own is atomic and does not require a session lock; however, it does require you to pass in the values, which probably came from a previous query call. If you do not call this with a session lock but some other caller has one, you will get rejester.LockError. If you do have a session lock, this will check that you still have it.

Parameters:
  • from_dict (str) – name of original dictionary
  • to_dict (str) – name of target dictionary
  • mapping (dict) – keys to move with new values
  • priority (int) – target priority, or None to use existing
Raises:
  • rejester.LockError – if the session lock timed out
  • rejester.EnvironmentError – if some items didn’t move
move_all(from_dict, to_dict)[source]

Move everything from one dictionary to another.

This can be expensive if the source dictionary is large.

This always requires a session lock.

Parameters:
  • from_dict (str) – source dictionary
  • to_dict (str) – destination dictionary
pull(dict_name)[source]

Get the entire contents of a single dictionary.

This operates without a session lock, but is still atomic. In particular this will run even if someone else holds a session lock and you do not.

This is only suitable for “small” dictionaries; if you have hundreds of thousands of items or more, consider filter() instead to get a subset of a dictionary.

Parameters:dict_name (str) – name of the dictionary to retrieve
Returns:corresponding Python dictionary
filter(dict_name, priority_min='-inf', priority_max='+inf', start=0, limit=None)[source]

Get a subset of a dictionary.

This retrieves only keys with priority scores greater than or equal to priority_min and less than or equal to priority_max. Of those keys, it skips the first start ones, and then returns at most limit keys.

With default parameters, this retrieves the entire dictionary, making it a more expensive version of pull(). This can be used to limit the dictionary by priority score, for instance using the score as a time stamp and only retrieving values before or after a specific time; or it can be used to get slices of the dictionary if there are too many items to use pull().

This is a read-only operation and does not require a session lock, but if this is run in a session context, the lock will be honored.

Parameters:
  • dict_name (str) – name of the dictionary to retrieve
  • priority_min (float) – lowest score to retrieve
  • priority_max (float) – highest score to retrieve
  • start (int) – number of items to skip
  • limit (int) – number of items to retrieve
Returns:

corresponding (partial) Python dictionary

Raises rejester.LockError:
 

if the session lock timed out

set_1to1(dict_name, key1, key2)[source]

Set two keys to be equal in a 1-to-1 mapping.

Within dict_name, key1 is set to key2, and key2 is set to key1.

This always requires a session lock.

Parameters:
  • dict_name (str) – dictionary to update
  • key1 (str) – first key/value
  • key2 (str) – second key/value
get(dict_name, key, default=None, include_priority=False)[source]

Get the value for a specific key in a specific dictionary.

If include_priority is false (default), returns the value for that key, or default (defaults to None) if it is absent. If include_priority is true, returns a pair of the value and its priority, or of default and None.

This does not use or enforce the session lock, and is read-only, but inconsistent results are conceivably possible if the caller does not hold the lock and include_priority is set.

Parameters:
  • dict_name (str) – name of dictionary to query
  • key (str) – key in dictionary to query
  • default – default value if key is absent
  • include_priority (bool) – include score in results
Returns:

value from dictionary, or pair of value and priority

set(dict_name, key, value, priority=None)[source]

Set a single value for a single key.

This requires a session lock.

Parameters:
  • dict_name (str) – name of the dictionary to update
  • key (str) – key to update
  • value (str) – value to assign to key
  • priority (int) – priority score for the value (if any)
delete(dict_name)[source]

Delete an entire dictionary.

This operation on its own is atomic and does not require a session lock, but a session lock is honored.

Parameters:dict_name (str) – name of the dictionary to delete
Raises rejester.exceptions.LockError:
 if called with a session lock, but the system does not currently have that lock; or if called without a session lock but something else holds it
direct_call(*args)[source]

execute a direct redis call against this Registry instances namespaced keys. This is low level is should only be used for prototyping.

arg[0] = redis function arg[1] = key — will be namespaced before execution args[2:] = args to function

Returns raw return value of function:
 

Neither args nor return values are encoded/decoded

7.5. Priority queue system

class rejester.RejesterQueue(config, name, worker_id=None)[source]

Bases: rejester._redis.RedisBase

A Redis-based priority queue.

Queue items should generally be (short) strings. Each item has some associated priority. check_out_item() will return some item with the highest available priority. The item must be renew_item() or return_item() within a specified expiration limit, or the item will be lost and may be given to another client.

If you have a queue item checked out, you may also reserve other queue items. This prevents other clients from checking out or reserving these items. When you return the main queue item, or if your check out expires, reserved items will be released.

Since the queue is based on Redis, there may be multiple queue clients on the same queue on different systems. Each RejesterQueue instance has a distinct worker ID, held in the worker_id property. You can manually specify it in the constructor or manually set it; otherwise, a unique ID will be obtained via the database on first use.

dump_queue(*names)[source]

Debug-log some of the queues.

names may include any of “worker”, “available”, “priorities”, “expiration”, “workers”, or “reservations_ITEM” filling in some specific item.

worker_id

A unique identifier for this queue instance and the items it owns.

add_item(item, priority)[source]

Add item to this queue.

It will have the specified priority (highest priority runs first). If it is already in the queue, fail if it is checked out or reserved, or change its priority to priority otherwise.

check_out_item(expiration)[source]

Get the highest-priority item out of this queue.

Returns the item, or None if no items are available. The item must be either return_item() or renew_item() before expiration seconds pass, or it will become available to future callers. The item will be marked as being owned by worker_id.

renew_item(item, expiration)[source]

Update the expiration time for item.

The item will remain checked out for expiration seconds beyond the current time. This queue instance must have already checked out item, and this method can fail if item is already overdue.

return_item(item, priority)[source]

Complete work on an item from check_out_item().

If this instance no longer owns item, raise LostLease. If priority is None, the item is removed from the queue; otherwise it is re-added with the specified priority. Any locked items associated with this item are unlocked.

reserve_items(parent_item, *items)[source]

Reserve a set of items until a parent item is returned.

Prevent check_out_item() from returning any of items until parent_item is completed or times out. For each item, if it is not already checked out or reserved by some other parent item, it is associated with parent_item, and the reservation will be released when parent_item completes or times out. Returns a list that is a subset of items for which we could get the reservation.

Raises LostLease if this queue instance no longer owns parent_item. If any of the items do not exist, they are silently ignored.

7.6. Exceptions

Exceptions raised in various places in rejester.

exception rejester.exceptions.RejesterException[source]

Bases: exceptions.Exception

base exception for rejester package

exception rejester.exceptions.EnvironmentError[source]

Bases: rejester.exceptions.RejesterException

indicates that the registry lost a lock or experienced a similar failure that probably indicates a network or remote server failure

exception rejester.exceptions.LockError[source]

Bases: rejester.exceptions.RejesterException

attempt to get a lock exceeded acquire time (atime)

exception rejester.exceptions.NoSuchWorkSpecError(work_spec_name, *args, **kwargs)[source]

Bases: rejester.exceptions.RejesterException

A TaskMaster function was called with a nonexistent work spec

work_spec_name = None

Name of the nonexistent work spec

exception rejester.exceptions.NoSuchWorkUnitError(work_unit_name, *args, **kwargs)[source]

Bases: rejester.exceptions.RejesterException

Valid work spec but invalid work unit.

This occurs when a rejester.TaskMaster function that manipulates existing work units is called with a valid work spec name but an invalid work unit name.

work_unit_name = None

Name of the nonexistent work unit

exception rejester.exceptions.ProgrammerError[source]

Bases: rejester.exceptions.RejesterException

exception rejester.exceptions.PriorityRangeEmpty[source]

Bases: rejester.exceptions.RejesterException

given the priority_min/max, no item is available to be returned

exception rejester.exceptions.LostLease[source]

Bases: rejester.exceptions.RejesterException

worker waited too long between calls to update and another worker got the WorkItem

exception rejester.exceptions.ItemInUseError[source]

Bases: rejester.exceptions.RejesterException

tried to add an item to a queue that was already in use

7.7. Example: StreamCorpus Simple Filter Stage

Rejester makes it easy to run large batches of small jobs, such as a couple million runs of tagging batches of 500 documents with an NER tagger or filtering to smaller sets of documents.

As a simple example, we illustrate how to write a filter function as an external stage and run it in AWS EC2.

streamcorpus_pipeline has several built-in filters source in github. You can create your own as external stages. For example, see this exact name match filter.

## use the newer regex engine
import regex as re
import string

## make a unicode translation table to converts all punctuation to white space
strip_punctuation = dict((ord(char), u" ") for char in string.punctuation)

white_space_re = re.compile("\s+")

def strip_string(s):
    'strips punctuation and repeated whitespace from unicode strings'
    return white_space_re.sub(" ", s.translate(strip_punctuation).lower())


class filter_exact_match(object):
    'trivial string matcher using simple normalization and regex'

    config_name = 'filter_exact_match'

    def __init__(self, config):
        path = config.get('match_strings_path')
        match_strings = open(path).read().splitlines()
        match_strings = map(strip_string, map(unicode, match_strings))
        self.matcher = re.compile("(.|\n)*?(%s)" % '|'.join(match_strings), re.I)

    def __call__(self, si, context):
        'only pass StreamItems that match'
        if self.matcher.match(strip_string(si.body.clean_visible.decode('utf-8'))):
            return si


## this is how streamcorpus_pipeline finds the stage
Stages = {'filter_exact_match': filter_exact_match}

To run this in rejester, you need to setup a redis server (use version 2.8 or newer), and put the hostname in your yaml configuration file:

logging:
  root:
    level: INFO

rejester:
  namespace: my_kba_filtering
  app_name: rejester
  registry_addresses: ["redis.example.com:6379"]

streamcorpus_pipeline:
  ## "." means current working directory
  root_path: .

  tmp_dir_path: tmp
  cleanup_tmp_files: true

  external_stages_path: examples/filter_exact_match.py

  reader: from_s3_chunks

  incremental_transforms:
    ## remove all StreamItems that do not exactly match
    - filter_exact_match

  batch_transforms: []

  filter_exact_match:
    ## names ending in "_path" will be made absolute relative to the root_path
    match_strings_path: my_match_strings.txt

  writers: [to_s3_chunks]

  from_s3_chunks:
    ## put paths to your own keys here; these files must be just the
    ## access_key_id and secret_access_key strings without newlines
    aws_access_key_id_path:     /data/trec-kba/keys/trec-aws-s3.aws_access_key_id
    aws_secret_access_key_path: /data/trec-kba/keys/trec-aws-s3.aws_secret_access_key

    ## this is the location of the NIST's StreamCorpus
    bucket: aws-publicdatasets
    s3_path_prefix: trec/kba/kba-streamcorpus-2014-v0_3_0

    tries: 10
    input_format: streamitem
    streamcorpus_version: v0_3_0

    ## you need this key if you are processing NIST's StreamCorpus,
    ## which is encrypted
    gpg_decryption_key_path: /data/trec-kba/keys/trec-kba-rsa.gpg-key.private

  to_s3_chunks:

    ## put your own bucket and paths to your own keys here; these
    ## files must be just the access_key_id and secret_access_key
    ## strings without newlines
    aws_access_key_id_path:     /data/trec-kba/keys/trec-aws-s3.aws_access_key_id
    aws_secret_access_key_path: /data/trec-kba/keys/trec-aws-s3.aws_secret_access_key

    bucket: aws-publicdatasets
    s3_path_prefix: trec/kba/kba-streamcorpus-2014-v0_3_0-to-delete

    output_name: "%(date_hour)s/%(source)s-%(num)d-%(input_md5)s-%(md5)s"
    tries: 10
    cleanup_tmp_files: true

    ## you only need these keys if you are encrypting the data that
    ## you are putting into your bucket; you need both if you require
    ## verify_via_http, which fetches and decrypts to validate what
    ## was saved.
    gpg_decryption_key_path: /data/trec-kba/keys/trec-kba-rsa.gpg-key.private
    gpg_encryption_key_path: /data/trec-kba/keys/trec-kba-rsa.gpg-key.pub
    gpg_recipient: trec-kba

    verify_via_http: true

Then, you can run the following commands to put tasks into the rejester queue:

## populate the task queue with jobs
streamcorpus_directory -c examples/streamcorpus-2014-v0_3_0-exact-match-example.yaml --file-lists list_of_s3_paths.txt

## try running one, to make sure it works locally
rejester -c examples/streamcorpus-2014-v0_3_0-exact-match-example.yaml run_one

## launch a MultiWorker to use all the CPUs on this machine:
rejester -c examples/streamcorpus-2014-v0_3_0-exact-match-example.yaml run_worker

## check on the status of your jobs
rejester -c examples/streamcorpus-2014-v0_3_0-exact-match-example.yaml summary

If you are interested in SaltStack states for spinning up machines to do this, reach out on streamcorpus@googlegroups.com