Package eoxserver :: Package resources :: Package processes :: Module tracker
[hide private]
[frames] | no frames]

Module tracker

source code

This module contains the process tracker API. Process tracker is an essential part of the ATP (Asynchronous Task Processing) subsystem.
Classes [hide private]
  QueueException
Task queue base exception.
  QueueEmpty
Queue exception signalising that the task queue is empty and no task can be pulled from it.
  QueueFull
Queue exception signalising that the task queue is full and no task can be pushed to it.
  DummyLock
Dummy (default) lock class implementing lock interface.
  TaskStatus
TaskStatus provides an interface to current asynchronous task.
Functions [hide private]
 
dummyHandler(taskStatus, input)
Dummy ATP handler.
source code
 
dbLocker(dbLock, func, *prm, **kprm)
Grant exclusive DB access while executing the passed function.
source code
 
registerTaskType(identifier, handler, timeout=3600, timeret=-1, maxrestart=2)
Register new task type.
source code
 
unregisterTaskType(identifier, force=False)
Unregister (remove) an existing task Type.
source code
 
_logStatusChange(obj, message)
auxiliary function
source code
 
getQueueSize()
Get number of enqueued tasks.
source code
 
getMaxQueueSize()
Get the maximum allowed number of task the queue can hold.
source code
 
getTaskInfo(task_id)
Get tuple of Type identifier, Instance identifiers, Instance status and corresponding status string
source code
 
getTaskIdentifier(task_id)
Get tuple of Type and Instance identifiers.
source code
 
getTaskStatus(task_id)
Get tuple of Instance status and corresponding status string.
source code
 
getTaskStatusByIdentifier(type, identifier)
Get tuple of Instance status and corresponding status string.
source code
 
deleteTask(task_id)
Delete task Instance.
source code
 
deleteTaskByIdentifier(type, identifier)
Delete task Instance.
source code
 
enqueueTask(type, identifier, input, message='')
Create new task Instance of the given Type using the given identifier and task inputs and enqueue this task for processing.
source code
 
reenqueueTask(task_id, message='')
Re-enqueue an existing task Instance identified by the given DB record ID and set its status to ACCEPTED.
source code
 
dequeueTask(serverID, message='')
Attempt to dequeue a single task from the task queue.
source code
 
startTask(task_id, message='')
Get the inputs of the task Instance identified by the given DB record ID...
source code
 
_setTaskStatus(task_id, message, status)
auxiliary function
source code
 
_getTaskStatus(task_id)
auxiliary function
source code
 
stopTaskSuccessIfNotFinished(task_id, message='')
Set status of task Instance identified by the given DB record ID to FINISHED if its status has not been set to FINISHED or FAILED yet.
source code
 
stopTaskSuccess(task_id, message='')
Set status of task Instance identified by the given DB record ID to FINISHED.
source code
 
stopTaskFailure(task_id, message='')
Set status of task instance identified by the given DB record ID to FAILED.
source code
 
pauseTask(task_id, message='')
Set status of task instance identified by the given DB record ID to PAUSED.
source code
 
resumeTask(task_id, message='')
Set status of task instance identified by the given DB record ID to RUNNING.
source code
 
setTaskResponse(task_id, response, mimeType='text/xml')
Set response of task Instance identified by the given DB record ID.
source code
 
getTaskResponse(type, identifier)
Return a tuple of task response and its MIME type.
source code
 
getTaskLog(type, identifier)
Return list of log records sorted by time for the task identified by the task Type and Instance identifiers.
source code
 
reenqueueZombieTasks(message='')
Find all tasks exceeding their time-out and try to re-enqueue them again.
source code
 
deleteRetiredTasks()
Find all FINISHED or FAILED task Instances exceeding their retention time and remove them.
source code
Variables [hide private]
  MAX_QUEUE_SIZE = 64
Actual queue size limit.
  __package__ = 'eoxserver.resources.processes'
  key = 'ACCEPTED'
  val = 1
Function Details [hide private]

dummyHandler(taskStatus, input)

source code 
Dummy ATP handler. No action implemented. Prototype of an ATP handler subroutine. Any ATP handler receives two parameters: * 'taskStatus' - an instance of TaskStatus class providing access the the actual task, * 'input' - input parameters specified during the task enqueueing.

dbLocker(dbLock, func, *prm, **kprm)

source code 
Grant exclusive DB access while executing the passed function. The 'dbLocker' function executes the 'dbLock.acquire()' and 'dbLock.release()' methods on entry and exit, respectively, assuring the executed function 'func' has an exclusive access to the DB. 'prm' and 'kprm' are the optional 'func' function parameters. The 'dbLocker' function returns the returning value of the passed 'func' function.

registerTaskType(identifier, handler, timeout=3600, timeret=-1, maxrestart=2)

source code 
Register new task type. The task type 'identifier' string and 'handler' subroutine must be specified. The string identifier must uniquely identify the created task type. Optionally, the parameters such as: task 'timeout' in sec. after which the task is restarted (re-enqueued for new processing), retention time ('timeret'), i.e., the time to keep finished tasks stored in DB, for any non-positive number the task is kept forever), and finally the max. allowed number of task's restarts caused by task time-out ('maxrestart'). When the number of restarts is exceeded, the task is labelled as FAILED and not re-enqueued any more). When called repeatedly with the same task identifier, the first run creates new task types and the subsequent calls update the task type parameters.

unregisterTaskType(identifier, force=False)

source code 
Unregister (remove) an existing task Type. By default, the task Type removal will fail as long as there is an existing task Instance raising 'django.db.models.ProtectedError' exception (a subclass of django.db.IntegrityError). To force the Type removal wiping out all the linked Instances set the 'force' parameter to True.

getTaskStatus(task_id)

source code 
Get tuple of Instance status and corresponding status string. 'task_id' is the DB record ID.

getTaskStatusByIdentifier(type, identifier)

source code 
Get tuple of Instance status and corresponding status string. 'type' is the Type string ID and 'identifier' is the Instance string ID.

deleteTask(task_id)

source code 
Delete task Instance. 'task_id' is the DB record ID.

deleteTaskByIdentifier(type, identifier)

source code 
Delete task Instance. 'type' is the Type string ID and 'identifier' is the Instance string ID.

enqueueTask(type, identifier, input, message='')

source code 
Create new task Instance of the given Type using the given identifier and task inputs and enqueue this task for processing. The task status is set to ACCEPTED. The 'type' parameter should be the string identifier of a registered task type. The string 'identifier' shall uniquely identify the created task. The 'input' can be any Python object serializable by the 'pickle' module. The optional log 'message' can be specified. In case of full task queue the QueueFull exception is risen.

reenqueueTask(task_id, message='')

source code 
Re-enqueue an existing task Instance identified by the given DB record ID and set its status to ACCEPTED. The optional log message can be specified. The task is always enqueued and can possibly increase the task queue size beyond queue size limit.

dequeueTask(serverID, message='')

source code 
Attempt to dequeue a single task from the task queue. An unique serverID must be provided to prevent collisions with the other ATPDs pulling tasks from the same queue. The function returns list of the dequeue tasks. There is rare but still possible chance that the function returns either zero or more than one tasks and the user must take this into consideration. The returned dequeued tasks' status is set to SCHEDULED. In case of an empty queue the QueueEmpty exception is risen.

startTask(task_id, message='')

source code 
Get the inputs of the task Instance identified by the given DB record ID and set the task's status to RUNNING

setTaskResponse(task_id, response, mimeType='text/xml')

source code 
Set response of task Instance identified by the given DB record ID. The response is expected to be python string (Text). However binary data (such as pickled data) may be used as well. It is safe to call this function repeatedly. First call creates a new Response record and the successive calls update the existing Response record.

getTaskResponse(type, identifier)

source code 
Return a tuple of task response and its MIME type. Task Instance is identified by an unique pair of Type and Instance string identifiers 'type' and 'identifier', respectively. The response is expected to be python string (Text). However binary data (such as pickled data) may be used as well.

getTaskLog(type, identifier)

source code 
Return list of log records sorted by time for the task identified by the task Type and Instance identifiers. Each log record is a tuple of three fields: time-stamp, status tuple (see get task status), and logged message.

reenqueueZombieTasks(message='')

source code 
Find all tasks exceeding their time-out and try to re-enqueue them again. Tasks exceeding the number of allowed start are rejected and marked as FAILED.

Variables Details [hide private]

MAX_QUEUE_SIZE

Actual queue size limit. Note may be removed in the future. Use 'getMaxQueueSize()' instead.
Value:
64