4.3.1. Creating your plugin¶
Warning
This feature is currently in alpha.
A plugin for the hub is a python module that contains one or multiple celery tasks that can be discovered by the Ngenea Worker, these tasks can then be used in workflows in Ngenea Hub.
4.3.1.1. Example plugin creation¶
Using the Ngenea Hub CLI a template plugin can be
created in the plugin directory at /var/lib/ngenea-worker/plugins
. All plugins will persist in this location and
will need to be in this directory to be correctly installed.
For the following example, we will be creating a plugin named echo_args
that will return the expected key word
arguments provided to the task itself.
To begin, create a plugin template for the example echo_args
using:
ngenea-worker plugins create echo_args
This will create the template plugin project at /var/lib/ngenea-worker/plugins/echo_args
. To implement custom logic,
navigate to /var/lib/ngenea-worker/plugins/echo_args
in which contains the python module file
echo_args/echo_args.py
. Within this file there is the basic example task example_task
:
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.utils.utils import get_queue_name
@shared_task(bind=True, name="dynamo.custom.example_task")
def example_task(self, *args, paths: List = None, jobid: int = None, **kwargs):
print(paths)
print(jobid)
status = FileActionStatus(self.name, paths, jobid, get_queue_name())
for path in paths:
status.add("processed", path["path"])
return status.dump()
Warning
The task return must adhere to the format exampled above otherwise the job will result in an indefinite Pending
state in the UI/API and the Ngenea Hub logs will present a traceback in the task call back. Refer to section: Custom Tasks/Return Payload
for the specific return format of task results.
For more information on the supported file states, please refer to the section supported file states
in the Custom Tasks
page.
The logic within this task can be replaced along with the name of the function to our echo task:
Note
The name provided to shared_task
will be the task name that is used to call the task within a workflow.
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.utils.utils import get_queue_name
@shared_task(bind=True, name="dynamo.custom.echo_args")
def echo_args(self, *args, paths: List = None, jobid: int = None, **kwargs):
status = FileActionStatus(self.name, paths, jobid, get_queue_name())
for path in paths:
status.add("processed", path["path"])
return status.dump()
With the new task in place, the entry point for the plugin will have to be adjusted in setup.py
.
Note
Ensure that all custom task entry points are defined under worker_plugin
Initial setup.py
entry points:
...
"worker_plugin": [
"echo_args=echo_args.echo_args:example_task",
]
...
In this case, the changes are as follows:
...
"worker_plugin": [
"echo_args=echo_args.echo_args:echo_args",
]
...
4.3.1.2. External Dependencies¶
If the logic in the plugin requires an external dependency, it can be installed into the worker using the setup.py
within the template plugin generated via ngenea-worker plugins create
. To add
additional dependencies, they can be added to the install_requires
section:
Note
Ensure that the celery package and version is un-edited within the plugin requirements, otherwise it may have unexpected effects on the functionality of Ngenea Worker.
install_requires=[
"celery<=5.4",
"example-module==0.1.0"
]
4.3.1.3. Installing custom plugins¶
To install the newly created plugin refer to the plugin installation page.