5.3.1. Creating your plugin¶
Warning
This feature is currently in alpha.
Ngenea Hub allows the integration of custom Celery tasks through a plugin system. These plugins are Python modules recognized by the Ngenea Worker and can be used within defined workflows in Ngenea Hub.
5.3.1.1. Creating a Custom Plugin: echo_args
¶
Ngenea Hub supports plugin development through its CLI, allowing users to extend functionality via custom task plugins. This section walks through creating a sample plugin named echo_args
, which simply returns the arguments provided to it.
Plugin Directory
All custom plugins must reside in the following directory for correct installation and execution:
/var/lib/ngenea-worker/plugins
Each plugin will persist in this location, and Ngenea Hub will reference this path to discover and load available plugins.
Step 1: Create the Plugin Template
To generate the basic structure for the echo_args
plugin, use the following command:
ngenea-worker plugins create echo_args
This command creates the plugin at: /var/lib/ngenea-worker/plugins/echo_args
.
Inside this directory, you will find a Python module at: /var/lib/ngenea-worker/plugins/echo_args/echo_args/echo_args.py
.
This module contains a default task function named example_task
, which we will customize in the next step.
Step 2: Implement the Plugin Logic
The default example_task
function is provided as a template. Here’s the original sample code:
from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
@app.task(bind=True, name="dynamo.custom.example_task")
def example_task(self, *args, paths: List = None, jobid: int = None, **kwargs):
print(paths)
print(jobid)
status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
for path in paths:
status.add("processed", path["path"])
return status.dump()
Important
Custom tasks must return their results using the FileActionStatus
object, as shown above.
Failure to follow this return structure will result in the task being stuck in a Pending state,
with tracebacks visible in the Ngenea Hub logs.
Refer to Custom Tasks > Return Payload for required return formats.
Step 3: Modify the Task for echo_args
Update the function and task name to reflect the custom behavior of our plugin. The revised implementation should look like this:
Note
The name provided to app.task
will be the task name that is used to call the task within a workflow.
from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
@app.task(bind=True, name="dynamo.custom.echo_args")
def echo_args(self, *args, paths: List = None, jobid: int = None, **kwargs):
status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
for path in paths:
status.add("processed", path["path"])
return status.dump()
Note
The value provided to the name
parameter in the @shared_task
decorator determines the task name
that will be used within workflow definitions.
Step 4: Update the setup.py Entry Point
For Ngenea Hub to recognize and register your custom task, you must define an appropriate entry point in your plugin’s setup.py
file. Entry points for custom tasks should be declared under the worker_plugin
group.
Original setup:
"worker_plugin": [
"echo_args=echo_args.echo_args:example_task",
]
Updated setup (to reflect the new function name):
"worker_plugin": [
"echo_args=echo_args.echo_args:echo_args",
]
5.3.1.2. External Dependencies¶
If your plugin relies on external Python packages, you can manage these dependencies through the setup.py
file, which is automatically included in the plugin template created using:
ngenea-worker plugins create
To specify additional dependencies, add them to the install_requires
list within setup.py
. This ensures the required packages are installed into the Ngenea Worker environment when the plugin is deployed.
Example:
install_requires = [
"celery<=5.4",
"example-module==0.1.0"
]
Note
Do not modify the version or presence of the celery
package in this list.
Altering celery
may lead to compatibility issues or disrupt core Ngenea Worker functionality.
By managing dependencies in this way, you ensure plugin compatibility, reduce runtime errors, and maintain seamless integration with the Ngenea Worker framework.