source documentation¶
cli¶
cli focuses on the command line tool aspect using click
See also
-
class
clichain.cli.Cli(name=None, invoke_without_command=False, no_args_is_help=None, subcommand_metavar=None, chain=False, result_callback=None, **attrs)¶ Implements root command using
click.MultiCommandSee also
Cliprovides implementation for the root command by extendingclick.MultiCommand(Then the click command is created specifyingClias “cls” parameter in theclick.commanddecorator.-
get_command(ctx, name)¶ Given a context and a command name, this returns a
Commandobject if it exists or returnsNone.
-
list_commands(ctx)¶ Returns a list of subcommand names in the order they should appear.
-
-
class
clichain.cli.Tasks(commands=None)¶ Defines a factory to register tasks types.
Tasksprovides a factory to register and hold implemented tasks so they get available to the user.Tasks are implemented by coroutines.
See also
The command line interface is implemented using
click. In order to make a task usable from the command line interface, you need to define aclickcommand (for most cases usingclick.commanddecorator). example:@click.command(name='compute') @click.option('--approximate', '-a', help='compute with approximation') @click.argument('x') def my_compute_task(approximate, x): " the task doc that will appear as help " # process inputs parameters and options...
See also
clickfor details on how to implements commandsThe command is expected to return a coroutine function such as
clichain.pipeline.coroutine, seeclichain.pipeline.createfor details.full example:
from clichain import pipeline, cli import ast # creates the factory here but should be # common to all task types... tasks = cli.Tasks() @pipeline.task def divide(ctrl, den): print(f'{ctrl.name} is starting') with ctrl as push: while True: value = yield push(value / den) print(f'{ctrl.name} has finished with no error') # the task will be made available as 'compute' in the # command line interface @tasks @click.command(name='compute') @click.option('--approximate', '-a', help='compute with approximation') @click.argument('den') def compute_task_cli(approximate, den): " the task doc that will appear as help " if den == 0: raise click.BadParameter("can't devide by 0") if approximate: den = round(den) return devide(den) @pipeline.task def parse(ctrl): _parse = ast.literal_eval with ctrl as push: while True: try: push(_parse((yield))) except (SyntaxError, ValueError): pass @tasks @click.command(name='parse') def parse_cli(): " performs literal_eval on data " return parse()
See also
-
__call__(cmd)¶ wraps and register a
click.commandobject into the factoryTasksobject is intended to be used as a decorator:tasks = cli.Tasks() # register 'compute' command into 'tasks' @tasks @click.command(name='compute') [...]
The command is expected to return a coroutine function such as
clichain.pipeline.coroutine, seeclichain.pipeline.createfor details.Note
a log message will be emitted to indicate this task is created every time the command is called
-
__init__(commands=None)¶ initializes new factory with
commandscommandsis a dict containing all registered commands, and will be set to a new empty dict if None.contextis the current click context, it’s set to None until the main command is called (seeapp), which will setcontextto the current context value.
-
_prepare_cmd(cmd)¶ wraps the
click.commandcallback function and replace itthe wrapper function will:
log a ‘create’ message (using
logger.info)use the callback function result to create the next coroutine in the pipeline. The coroutine function created by the callback function is stored in the current
clickcontext. A stack is used to process pipeline’s branches.See also
clichain.pipelinefor details on how the pipeline is specified
The wrapper function does not return anything
See also
this method is called by
Tasks.__call__
-
-
clichain.cli._get_obj(tasks, args, kwargs)¶ get obj parameter for
clickcontextThe created obj is a dict, it’s used internally when processing commands.
-
clichain.cli.app(tasks, *args, **kw)¶ run
clickmain command: this will start the CLI tool.See also
tasksis theTasksfactory to use (containing user commands)extra
argsandkwargsare added to theclickcontext’s obj (a dict) as ‘args’ and ‘kwargs’, they’re not used by the framework.appusesCliwhich extendsclick.MultiCommandto create the main command as a multicommand interface. This main command holds all the user defined commands and is the main entry point of the created tool.The pipeline itself is created and run by the
processfunction, which is called when the main command itself returns, i.e when the all the input args have been processed byclick.See also
the main command itself only set up logging, see also
usage
-
clichain.cli.process(obj, rv, logfile, verbose)¶ callback of the main command, called by
clickprocesscreates the pipeline (usingclichain.pipeline.create), then run it with inputs from stdin and sending outputs to stdout (getting stdin and stdout fromclick.get_text_stream).if an exception occures then log the exception and raise
click.Abort.See also
-
clichain.cli.test(tasks, clargs, args=None, kwargs=None, **kw)¶ run the CLI using
click.testing, intended for automated testsSee also
The main command is then run with
click.testing.CliRunnerthis is roughly equivalent to:
>>> runner = click.testing.CliRunner() >>> obj = cli._get_obj(tasks, args, kwargs) >>> result = runner.invoke(cli._app, clargs, obj=obj, **kw)
tasksis theTasksfactory to use (containing user commands)clargsis a list containing the command line arguments, i.e what the user would send in interactive mode.optional
argsandkwargswill be sent to theclickcontext (in context.obj[‘args’] and context.obj[‘kwargs’]), they will not be used by the framework.extra
kwwill be forwarded toclick.testing.CliRunner.invoke, for example:input=[1,2,3], catch_exceptions=False
creates a
click.testing.CliRunnerto invoke the main command and returns runner.invoke result.See also
click.testing
-
clichain.cli.usage()¶ create a pipeline of tasks, read text data from the standard input and send results to the standard output:
stdin(text) --> tasks... --> stdout(text)
The overall principle is to run a data stream processor by chaining different kinds of tasks (available tasks depending on the implementation, see list below).
you can create a single branch pipeline as a sequence of tasks, for instance:
inp >>> A -> B -> C >>> out
or you can create a more complex pipeline defining multiple branches, for instance:
+--> B --> C --+ inp >>> A--| +--> F >>> out +--> D --> E --+
tasks are implemented by coroutines functions as described by David Beazle (see http://www.dabeaz.com/coroutines/ for details).
Specifying pipeline workflow:
basic syntax allows you to specify the worflow of the pipeline.
A single sequence of tasks as the following:
inp >>> A -> B -> C >>> out
is specified as:
A B C
Note
plus parameters and options of the tasks themselves, i.e:
A -x -y arg1 B -z ...
Creating branches requires ‘workflow commands’, for instance the following example:
+--> B --> C --+ inp >>> A--| +--> F >>> out +--> D --> E --+
would be specified as:
A [ B C , D E ] F
the same way we can define sub branches, for instance:
+--> C1 --+ +--> B --| +-----+ | +--> C2 --+ | inp >>> A--| +--> F >>> out +--> D --> E ------------+
would be specified as:
A [ B [ C1 , C2 ] , D E ] F
Execution order:
When parallel branches are defined (as ‘C1’ and ‘C2’ in the previous example) they are processed in the same order as they are defined in the command line arguments, that means in this example:
A [ B [ C1 , C2 ] , D E ] F
If the input data is:
1 2 [...]
Then the workflow will be such as:
# data will go through C1 then C2 1 -> A -> B -> C1 -> F 1 -> A -> B -> C2 -> F 2 -> A -> B -> C1 -> F 2 -> A -> B -> C2 -> F [...]
And the order is reproducible
Attaching a name to branches or tasks:
You can attach a name to coroutines when defining the pipeline, which will be used as a suffix to get the logger if an exception occurs in the coroutne, i.e:
base_logger.getChild(<name>)
Note
This is useful in particular if you’re using the same task type in several branches. The name could be used as well in the coroutine, depending on its implementation (see
clichain.pipeline.createfor more details).example:
+--> B --> C --+ inp >>> A--| +--> B >>> out +--> B --> D --+
you could specify the name of the branches (i.e all the coroutines of those branches) with:
A [ { 'b1' B C } , { 'b2' B D } ] { 'b3' B )
Note
the name specification is valid for every coroutine whose definition starts within the parenthesis, for example:
A { 'b1' [ B C , B D ] } B
is equivalent to:
A [ { 'b1' B C , B D } ] B
which is also equivalent to:
A [ { 'b1' B C , B D ] } B
which is equivalent to:
A [ { 'b1' B C } , { 'b1' B D } ] B
note the output ‘B’ coroutine will have no name
And using the following specification:
A [ { 'b1' B } C , { 'b2' B } D ] B
will only give ‘b1’ and ‘b2’ names to the ‘B’ coroutines (and not to the ‘C’ and ‘D’ coroutines as in the previous example).
Then note the following:
A { 'b1' [ B C , { 'b2' B } D ] } B
is equivalent to:
A [ { 'b1' B C } , { 'b2' B } { 'b1' D } ] B
Then note that the following:
A [ { 'b1' } B , { 'b2' } B ] B
will have no effect at all.
pipeline¶
pipeline module provides tools to create a pipeline of tasks
a pipeline can be composed of one or several branches, but everything runs in a single thread. The initial goal of this framework is to provide a simple and direct way of defining task types and reuse them in different pipeline configurations.
The motivation is not to parallelise tasks yet tasks could be parallelized in some configurations, depending on the exact use case and the context…
tasks are implemented by coroutines functions as described by David Beazle (see http://www.dabeaz.com/coroutines/ for details).
This module is used by clichain.cli module.
-
class
clichain.pipeline.Context(logger=<Logger clichain.pipeline (WARNING)>, obj=None)¶ will be passed to all `coroutine`s in the pipeline
Contextobject is a common object shared by all coroutines in the pipeline.attributes:
exceptionsis a list which remains empty until an exception occurs within ataskand is handled by the module. Thenexceptioncontains each exception caught by the module. Each exception is logged only one time with its traceback when it’s caught byControlcontext manager.Note
if an exception is caught by the module it will be “re-raised” thus terminate the process but user code could still raise another exception(s) for example if a coroutine is not implemented using
taskor GeneratorExit is handled within the user loop…loggerwill be used for every message logged by the module, and can be used by the user. The default is to use the module’slogger.objattribute is an arbitrary object provided by the user when creating the pipeline. The default isNone.
See also
-
class
clichain.pipeline.Control(context, name, targets)¶ Internal, ‘control’ obj received by
taskdecorated functionsControlis a context managerSee also
-
__enter__()¶ return push function
See also
-
__exit__(tpe, value, tb)¶ handle GeneratorExit exception and log unhandled exceptions
Controlobject is created bytaskdecorator, the decorated function gets theControlobject as first arg, and is expected to use it as a context manager, which will handle GeneratorExit and log any unhandled exception.contextattribute (Contextobject) will be used if the exception is notNoneorGeneratorExit, in order to:- determine if the exception traceback should be logged,
if the exception has already been logged by another
Controlobject (i.e in another coroutine), then only an error message will be logged, otherwise the full exception will be recorded. - get the base logger to use
See also
- determine if the exception traceback should be logged,
if the exception has already been logged by another
-
__init__(context, name, targets)¶ initialize new
Controlobjectcontextis aContextobject,Controlobject will use it to log exception if an exception occurs in the coroutine while used as a context manager. TheContextobject is also accessible through thecontextattribute.namewill be accessible asnameattribute (rw)ex:
logger = logging.getLogger(f'{__name__}.{ctrl.name}')
targetswill be accessible throughtargetsproperty (ro)See also
targetsis read only to ensure consistency withpushfunction returned byControl.__enter__:Controlobject is expected to be used as a context manager:with ctrl as push: while True: data = yield push(data) # send data to next coroutines
a
pushfunction is defined and returned whenControlis used as a context manager, but can actually be created usingControl.pushproperty.the purpose is to force using an efficient solution avoiding attributes lookup (using self) for every call, which has an impact given this function is likely to be called a lot (usually for each processed item). This way we define the function and reference it once in the user function (as ‘push’ in the example).
See also
taskdecorator
-
push¶ return a ‘push’ function sending data to next coroutines
See also
-
targets¶ next coroutines in the pipeline
-
-
class
clichain.pipeline.Pipeline(targets)¶ User interface returned by
createfunctionPipelineobject contains the `coroutine`s of a pipeline.When used as a context manager, it ensures that coroutines will be closed immediately at the end of the process.
See also
Pipelinealso has an additionalPipeline.runmethod which can be called to run the pipeline over an input stream and wait until the process complete.-
__enter__()¶ return a function sending data thtough the pipeline
ex:
with pipeline as process: for data in stream: process(data)
Note
this is equivalent to:
with pipeline: target = pipeline.target for data in stream: target.send(data)
See also
-
__exit__(tpe, value, tb)¶ close all the coroutines of the pipeline, raise exc if any
The purpose of using the
Pipelineobject as a context manager is essentially to make sure all the coroutines will be terminated (closed) at the end of the process.This can be critical if user functions are expected to do some teardown jobs after processing data, for instance:
# file won't be closed until the coroutine is closed # (see while loop...) @coroutine def cr(targets, *args, file, **kw): with open(file) as f: while True: data = yield [...]
See also
-
__init__(targets)¶ initialize a new pipeline with `coroutine`s
targetsis an iterable containing the coroutines of the pipeline, the first item must be the input coroutine.See also
-
run(inputs)¶ run the pipeline over
inputsiteratorsend data from
inputsto the pipeline until their is no more data in inputs or an exception occurs.
-
-
clichain.pipeline.coroutine(func)¶ coroutine decorator, ‘prime’ the coroutine function
this function is intended to be used as a decorator to create a basic coroutine function, for instance:
@coroutine def cr(*args, **kw): print('starting...') try: while True: item = yield print(f'processing: {item}') except GeneratorExit: print('ending...')
calling the decorated function will automatically get it to the first yield statement.
>>> cr() starting...
Note
the decorated function is wrapped using
functools.wrapsSee also
-
clichain.pipeline.create(tasks, output=<built-in function print>, **kw)¶ create a pipeline of coroutines from a specification
a pipeline is a succession of coroutines organized into one or several branches.
outputis a strategy to use for the pipeline output, the default strategy isprint.outputwill be called for each data reaching the pipeline’s output, it takes a single argument.extra keyword args will be used to initialize the
Contextobject that will be send to all the coroutines of the pipeline.tasksargument describes the pipeline and consists of a mapping of coroutines as key: value pairs, where each single key identifies a single coroutine.each coroutine is defined either by a single
coroutinefunction (see task field below) or a dictionnay, which contains the following fields:task: the coroutine function to use
See also
taskdecoratorthe coroutine function will be called with the following keyword arguments:
context: the
Contextobject shared by all the coroutines of the pipeline.targets: an iterable containing the following coroutines in the pipeline (default is no targets).
debug: an optional name, used by
taskto get a child logger from theContextlogger, which will be used to log error if an exception occurs. The exception will be logged at error level and the exc_info will be passed to the log record. The value will be accessible (and writeable) throughControl.nameattribute, which can be usefull for logging:ex:
logger = logging.getLogger(f'{__name__}.{ctrl.name}')
Note
Default value is the coroutine’s key in the pipeline definition (default will be used if value is
Noneor an empty string).
input: (optional) set this coroutine as a ‘target’ of the coroutine(s) defined by input. input can be a single key or an iterable containing keys of other coroutines defined in the pipeline dictionnary.
Note
Nonevalue will be interpreted as the pipeline’s main input. No value or an empty list is equivalent as None if this coroutine is not specified as output of an other coroutine in the pipeline.output: (optional) set the coroutine(s) whose keys are defined in output as a ‘target’ of this coroutine. output can be a single key or an iterable containing keys of other coroutines defined in the pipeline dictionnary.
Note
Nonevalue will be interpreted as the pipeline’s main output. No value or an empty list is equivalent as None if this coroutine is not specified as input of an other coroutine in the pipeline.debug: (optional) a debug name to use in logging if an unhandled exception occurs. see above description.
Note
specifying a coroutine by a
coroutinefunction is equivalent as providing a dictionnary containing only the task field.examples:
given we have the following declarations:
@coroutine def output(targets, **kw): try: while True: for t in targets: t.send('RESULT: {}'.format((yield))) except GeneratorExit: return @task def parse(ctrl): with ctrl as push: while True: try: value = ast.literal_eval((yield)) except (SyntaxError, ValueError): continue push(value) @task def mytask(ctrl, param): logger = logging.getLogger(f'{__name__}.{ctrl.name}') logger.info('starting task') with ctrl as push: while True: [...] logger.info('finishing task')
defining a pipeline composed of a single sequence:
example:
inp >>> a --> b --> c --> d >>> out
here’s how we could define it:
pipeline = pipeline.create({ 'a': parse(), 'b': {'task': mytask(1), 'input': 'a'}, 'c': {'task': mytask(2), 'input': 'b'}, 'd': {'task': output, 'input': 'c'}, })
the created pipeline is a
Pipelineobject, it can be run over any input generator using its ‘Pipeline.run’ method, sending data to stdout by default.See also
define a pipeline with branches:
example:
+--> B --> C >>> out inp >>> A--| +--> D --> E >>> out
here’s how we could define it:
pipeline = pipeline.create({ 'a': {'task': A, 'output': ('b', 'd')}, 'b': B, 'd': D, 'c': {'task': C, 'input': 'b'}, 'e': {'task': E, 'input': 'd'}, })
redoundant specification is not an issue, and the following example is equivalent to the previous one:
pipeline = pipeline.create({ 'a': {'task': A, 'output': ('b', 'd')}, 'b': {'task': B, 'input': 'a', 'output': 'c'}, 'd': {'task': D, 'input': 'a', 'output': 'e'}, 'c': {'task': C, 'input': 'b', 'output': None}, 'e': {'task': E, 'input': 'd', 'output': ()}, })
join branches
example: given we want to implement this:
+--> B --> C --+ inp >>> A--| +--> N >>> out +--> D --> E --+
here’s how we could define it:
pipeline = pipeline.create({ 'a': {'task': A, 'output': ('b', 'd')}, 'b': B, 'c': {'task': C, 'input': 'b', 'output': 'f'}, 'd': D, 'e': {'task': E, 'input': 'd', 'output': 'f'}, 'f': F, })
control branches order
the order in which coroutines are initialized, called and closed is reproducible.
to control the data flow order between several branches just use the keys in the pipeline definition, as they will be sorted, like in the following example:
+--> (1) X --+ +--> (2) X --+ inp >>> A--+--> (3) X --+--> B >>> out +--> (4) X --+ +--> (5) X --+
here’s how we could define it:
pipeline = pipeline.create({ 'a': A, 1: {'task': X, 'input': 'a', 'output': 'b'}, 2: {'task': X, 'input': 'a', 'output': 'b'}, 3: {'task': X, 'input': 'a', 'output': 'b'}, 4: {'task': X, 'input': 'a', 'output': 'b'}, 5: {'task': X, 'input': 'a', 'output': 'b'}, 'b': B, })
the ‘X’ coroutines will be initialized and processed in the expected order: 1, 2, 3, 4, 5 (they will be closed, if no exception occurs, in the opposite order).
loop back
Warning
looping is currently not implemented and will raise a
NotImplementedErrorwhen creating the pipeline.example: given we want to implement this:
+--> B --> C --+ + >>> out inp >>> A--| +--> N -- + +--> D --> E --+ | | | +--> F --+ | | | +------------------+
here’s how we could define it:
pipeline = pipeline.create({ 'a': {'task': A, 'output': ('b', 'd')}, 'b': {'task': B, 'output': 'c'}, 'c': {'task': C}, 'd': {'task': D, 'output': 'e'}, 'e': {'task': E}, 'n': {'task': N, 'input': ('c', 'e'), 'output': None}, 'f': {'task': F, 'input': 'n', 'output': 'n'}, })
Warning
defining a loop can end up in an infinite recursion , no control is done on this, so it’s up to the tasks implementation to handle this…
specify coroutines name
in some contexts we may want to define a name for a coroutine which is different from its key.
example: the previous example with ordered branches was:
+--> (1) X --+ +--> (2) X --+ inp >>> A--+--> (3) X --+--> B >>> out +--> (4) X --+ +--> (5) X --+
here’s how we could define it:
pl = { 'a': A, 'b': B, } pl.update({ i: {'task': X, 'input': 'a', 'output': 'b', 'debug': f"the X task number {i}"} for i in range(1, 6) }) pl = pipeline.create(pl)
-
clichain.pipeline.task(func)¶ make “partial”
coroutinesexpected to be used withcreate.taskwill create a “partial” function, which when called with args and keyword args will actually return acoroutinefunction designed to be used withcreatefunction.example:
a basic coroutine adding offset to input data could be defined as follows using
task:@task def offset(ctrl, offset): print('pre-processing') with ctrl as push: while True: value = yield push(value + offset) # will be executed unless an exception occurs in # the 'while' loop print('post_processing')
ctrlwill handle theGeneratorExitexception and log any unhandled exception.- the
pushmethod send data to the next coroutines in the pipeline.
the resulting function is called with the original function’s args and keyword args:
off = offset(offset=1)
off is a partial
coroutinefunction expected to be used in a pipeline defintion withcreate.the coroutine will eventually be created calling this new function with specific arguments depending on the pipeline specification (see
createfor details), ex:# create the coroutine off = off(targets=[t1, t2...])
Note
as for
coroutine, all the functions (partial or final functions) are wrapped usingfunctools.wrapsexample:
@task def output(ctrl): with ctrl: while True: print((yield)) @task def parse(ctrl): with ctrl as push: while True: try: value = ast.literal_eval((yield)) except (SyntaxError, ValueError): continue push(value) @task def offset(ctrl, offset): offset = int(offset) logger = logging.getLogger(f'{__name__}.{ctrl.name}') logger.info(f'offset: {offset}') with ctrl as push: while True: value = yield push(value + offset) logger.info('offset task finished, no more value') if __name__ == '__main__': out = output() off1 = offset(10) off2 = offset(offset=100) parse = parse() # the previous results (out, off1, off2, proc) should # be used in the pipeline definition and the followings # should be performed by "create" out = out() off1 = off1((out,)) off2 = off2((out,)) parse = parse([off1, off2]) with open('foo.txt') as inputs: for data in inputs: parse.send(data) out.close() off1.close() off2.close() parse.close()