[Pulp-list] Sanity Check

Jeff Ortel jortel at redhat.com
Thu Sep 23 13:58:02 UTC 2010


All,

I'd like to get a quick sanity check on how I designed the dispatching of API methods 
(that rely on RMI on the agent) through the task subsystem.  The approach is simple 
enough.  Hopefully, the long write up will not make it seem more complicated than it 
really is :)

First, add classes to pulp.server.async.py that mirror the pulp.server.agent.Agent class 
but performs RMI asynchronously by default and specifies the same correlation tag as the 
reply listener.  The  pulp.server.agent.Agent does synchronous RMI by default and this 
way, the replies are guaranteed to match what the reply listener is listening on.

Second, the ReplyListener is added to pulp.server.async.py.  It's job is to listen for 
asynchronous RMI replies based on the same correlation tag specified when the RMI was 
invoked as described above.  When replies are received, the associated task is updated by 
calling either Task.succeeded() or Task.failed().  The task is also updated with the RMI 
returned value or exception as appropriate.

Third, I extended the pulp.server.tasking.task.Task as AsyncTask.  Unlike its superclass, 
it expects its callable to be asynchronous.  Further, when the task runs the callable, the 
task's state is not advanced in anticipation that its succeeded() or failed() methods will 
be called by an external event.

Last, and most significant, is how the API classes leverage (1-3).  The API methods such 
as ConsumerApi.installpackages() need to do something on the agent and then update 
consumer history if the agent (RMI) was successful.  This is what makes this kind of 
tricky.  So, what I decided to implement this by subclassing the AsyncTask as 
pulp.server.api.consumer.InstallPackages.  This specialized task is created/enqueued and 
returned by ConsumerApi.installpackages() giving InstallPackages.install() as the task's 
target callable.  When invoked, it sends the RMI.  Then, when the reply comes in and 
Task.succeeded() is called by the queue.  This method is overridden in InstallPackages 
which calls super.succeeded() and then updates the consumer history.

It looks like this:

WS(controller)-->ConsumerApi.installpackages()-->FIFOQueue.enqueue(task)
   [ task runs Task.callable() ]
InstallPackages.install()-->AsyncAgent.packages.install()
   [ RMI to agent ]
   [ agent reply received on QPID queue ]
ReplyListener.succeeded(reply)
   -->FIFOQueue.find(taskid)
   -->InstallPackages.succeeded(reply)
   -->Super.succeeded(reply)
   -->ConsumerHistoryApi.packages_installed()

Done.

The goal was to provide for arbitrary complexity on how the API was implemented which 
includes doing the async RMI to the agent and then following up with calls to update the 
consumer history and so forth.  Mixing synchronous and asynchronous logic within the API 
implementation makes it a little trickier.  I also wanted to minimize the async (tasking & 
RMI) plumbing in the API files.


Hope this makes sense.  It's pretty straight forward but we can discuss on a call if we 
need to.  Thoughts?

-jeff

-------------- next part --------------
A non-text attachment was scrubbed...
Name: smime.p7s
Type: application/pkcs7-signature
Size: 5126 bytes
Desc: S/MIME Cryptographic Signature
URL: <http://listman.redhat.com/archives/pulp-list/attachments/20100923/36d1a605/attachment.p7s>


More information about the Pulp-list mailing list