4.3.4. Example Plugins¶
Warning
This feature is currently in alpha
Here are a couple examples of functionality that can be added to the hub, these can be dropped into any template project.
4.3.4.1. Email notification task¶
The intent of this plugin is to email a list of staff members at the end of a workflow to ensure that they are informed of its completion, it has extra key word arguments that allow the editing of the subject:
import sys
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from smtplib import SMTP
from typing import List
from celery import shared_task
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
def send_email(email_to: List[str], email_from: str, subject: str, message: str, server: str = "localhost"):
"""
Sends an email through an already configured SMTP server
:param email_to: List of email recipients
:param email_from: Address that email derives from
:param subject: Email subject
:param message: Message to send
:param server: Target SMTP server address
:return: None
"""
smtp_message = MIMEMultipart()
smtp_message['From'] = email_from
smtp_message['To'] = ', '.join(email_to)
smtp_message['Date'] = formatdate(localtime=True)
smtp_message['Subject'] = subject
smtp_message.attach(MIMEText(message))
try:
smtp = SMTP(server)
smtp.sendmail(email_from, email_to, smtp_message.as_string())
smtp.close()
except OSError as error:
# All SMTP errors are derived from OSError and catching all SMTP errors from the base exception is not allowed
print('Error sending notification email: %s', error)
@shared_task(bind=True, name="dynamo.custom.email_staff")
def email_staff(
self,
*args,
paths: List = None,
jobid: int = None,
staff_members: List = None,
message: str = None,
subject: str = None,
server: str = None,
from_address: str = None,
**kwargs
):
if not message:
message = f"Job {jobid} has completed successfully processing {len(paths)} paths"
if not from_address:
from_address = "ngenea-worker@pixitmedia.com"
if not subject:
subject = "Ngenea Worker Job completed successfully"
if not server:
server = "localhost"
status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
send_email(email_to=staff_members, email_from=from_address, subject=subject, message=message, server=server)
return status.dump()
if __name__ == "__main__":
sys.exit(0) # pragma: no cover
The setup.py
will need editing to make the entry point the name of the function email_staff
in the module that
had been created for this example plugin.
To make use of this new task in the hub, here is an example workflow that will email staff members after all the data has been migrated with a custom subject and receiving email address:
{
"name": "migrate_notif",
"label": "Migrate with notif",
"icon_classes": [
"fa fa-cloud fa-stack-2x text-primary",
"fa fa-refresh fa-stack-1x text-light"
],
"discovery": null,
"enabled": true,
"visible": true,
"fields": [],
"filter_rules": [
{
"type": "all",
"state": "all",
"action": [
{
"name": "dynamo.tasks.migrate"
},
{
"name": "dynamo.custom.email_staff",
"staff_members": [
"johnsmith@organisation.com",
"admin@organisation.com"
],
"subject": "Project number 7 job complete",
"from_address": "notifications@organisation.com"
}
]
}
]
}
4.3.4.2. Running additional script¶
After running a set of tasks, you may need to execute a script on the host machine of the Ngenea Worker instance.
The following plugin allows the execution of an arbitrary script located at /opt/cloud_script.sh
:
Note
Any script run through this plugin will be run with root access
import sys
from subprocess import run as run_script
from subprocess import TimeoutExpired, SubprocessError, PIPE, STDOUT
from typing import List
from celery import shared_task
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
DEFAULT_SCRIPT = "/opt/cloud_script.sh"
@shared_task(bind=True, name="dynamo.custom.run_cloud_script")
def run_cloud_script(
self,
*args,
paths: List = None,
jobid: int = None,
script_location: str = None,
timeout: int = 600,
additional_args: List = None,
use_paths: bool = False,
**kwargs
):
status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
try:
args = [script_location if script_location else DEFAULT_SCRIPT]
if additional_args:
args = args + additional_args
if use_paths:
# Appends the paths to the arguments
args = args + [path["path"] for path in paths]
result = run_script(
args,
stdout=PIPE,
stderr=STDOUT,
check=True,
timeout=timeout,
)
status.add_log(str(result.stdout))
if result.returncode == 0:
for path in paths:
status.add("processed", path["path"])
except TimeoutExpired:
status.add_log("Called script timed out")
except SubprocessError as sp_err:
status.add_log(str(sp_err))
return status.dump()
if __name__ == "__main__":
sys.exit(0) # pragma: no cover
The setup.py
will need editing to make the entry point the name of the function run_cloud_script
in the module
that had been created for this example plugin.
To make use of this new task in the hub, here is an example workflow that will run the default cloud script after all of the data has been migrated:
{
"name": "migrate_cloud_script",
"label": "Migrate with notif",
"icon_classes": [
"fa fa-cloud fa-stack-2x text-primary",
"fa fa-refresh fa-stack-1x text-light"
],
"discovery": null,
"enabled": true,
"visible": true,
"fields": [],
"filter_rules": [
{
"type": "all",
"state": "all",
"action": [
{
"name": "dynamo.tasks.migrate"
},
{
"name": "dynamo.custom.run_cloud_script"
}
]
}
]
}