CPS Transaction extensions

Author: Julien Anguenot
Revision: $Id$

Contents

Status, Draft v1

1   Introduction and motivations

The first motivation behind this extension was performance. On large scale projects, where a huge amount of items are stored within the catalog, we realized that operations like creation, edition, deletion or workflow actions were taking more and more time as the catalog grew up.

The first bottleneck were the CMF and DCWorkflow that are performing a lot of non optimum or even useless indexing, all along transactions. The first reflex I had was to check if I could reduce the indexation calls and see how many indexation, at most, I could get. It ended up being an hack and I realized, that anyway, I couldn't be sure of the amount of indexation that would occur for a given transaction: I couldn't prevent third party code using the CPS framework from making indexation calls, thus decreasing the overall CPS based application performance.

The first step was to extend the ZODB transaction mechanism to support commit hooks that would execute transactional code just before the actual transaction commit time, but at the end of the "regular" transaction execution. See section 2 for more information about this.

With this ZODB transaction extension, we defined the indexation manager which was a real win for performance, since we ended up having an atomic indexation, whatever the transaction was dealing with.

We realized soon after that we could use additional commit hooks to manage other non optimum internal CPS tasks. CPS extends the CMF and provides an event service machinery allowing to receive events, and dispatch them to interested parties. So we defined an event subscriptions manager, dispatching user notifications based on events, and a tree cache manager, maintaining a tree cache for evident performance issues. It seems that the Plone guys never really get that<wink>. The main idea is to sort, filter and process events as less as possible, being aware that events are sent several times in the event channel at runtime, without having a complete control on this flow: think about third party code.

We defined a commit hook ordering policy for our CPS managers it was clear that we would be faced to subscriber execution dependencies. See section 3 for more information about this.

The section 4 is a small devel FAQ.

Note about the terms used in this document:

2   ZODB before commit hooks

2.1   Introduction

ZODB transaction allows to register hooks to be called before the transaction is committed.

The specified hook functions will be called after the transaction commit method has been called, but before the commit process has been started.

Multiple hooks can be registered and will be called in the order they were registered (first registered, first called). This method can also be called from a hook: an executing hook can register more hooks. Applications should take care to avoid creating infinite loops by recursively registering hooks.

Hooks are called only for a top-level commit. A sub-transaction commit or savepoint creation does not call any hooks. If the transaction is aborted, hooks are not called, and are discarded. Calling a hook "consumes" its registration too: hook registrations do not persist across transactions.

2.2   Transaction API

The ZODB transaction provides the following public API related to before commit hooks:

>>> def addBeforeCommitHook(hook, args=(), kws=None):
...     """Register a hook to call before the transaction is committed.
...     """
>>>

>>> def getBeforeCommitHooks():
...     """Return iterable producing the registered addBeforeCommit hooks.
...
...     A triple (hook, args, kws) is produced for each registered hook.
...     The hooks are produced in the order in which they would be invoked
...     by a top-level transaction commit.
...     """
>>>

2.3   Examples

Let's check some examples of use extracted from the ZODB tests.

Let's define a hook to call, and a way to see that it was called:

>>> log = []
>>> def reset_log():
...     del log[:]

>>> def hook(arg='no_arg', kw1='no_kw1', kw2='no_kw2'):
...     log.append("arg %r kw1 %r kw2 %r" % (arg, kw1, kw2))

Now register the hook with a transaction:

>>> import transaction
>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, '1')

We can see that the hook is indeed registered:

>>> [(hook.func_name, args, kws)
...  for hook, args, kws in t.getBeforeCommitHooks()]
[('hook', ('1',), {})]

When transaction commit starts, the hook is called, with its arguments:

>>> log
[]
>>> t.commit()
>>> log
["arg '1' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()

A hook's registration is consumed whenever the hook is called. Since the hook above was called, it's no longer registered:

>>> len(list(t.getBeforeCommitHooks()))
0
>>> transaction.commit()
>>> log
[]

The hook is only called for a full commit, not for a savepoint or sub-transaction:

>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, 'A', dict(kw1='B'))
>>> dummy = t.savepoint()
>>> log
[]
>>> sp = t.savepoint(optimistic=True)
>>> log
[]
>>> t.commit()
>>> log
["arg 'A' kw1 'B' kw2 'no_kw2'"]
>>> reset_log()

If a transaction is aborted, no hook is called:

>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, ["OOPS!"])
>>> transaction.abort()
>>> log
[]
>>> transaction.commit()
>>> log
[]

The hook is called before the commit does anything, so even if the commit fails the hook will have been called. To provoke failures in commit, we'll add failing resource manager to the transaction:

>>> class CommitFailure(Exception):
...     pass
>>> class FailingDataManager:
...     def tpc_begin(self, txn, sub=False):
...         raise CommitFailure
...     def tpc_abort(self, txn):
...         pass
...     def abort(self, txn):
...         pass

>>> t = transaction.begin()
>>> t.join(FailingDataManager())

>>> t.addBeforeCommitHook(hook, '2')
>>> t.commit()
Traceback (most recent call last):
...
CommitFailure
>>> log
["arg '2' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()

Let's register several hooks:

>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, '4', dict(kw1='4.1'))
>>> t.addBeforeCommitHook(hook, '5', dict(kw2='5.2'))

They are returned in the same order by getBeforeCommitHooks:

>>> [(hook.func_name, args, kws)     #doctest: +NORMALIZE_WHITESPACE
...  for hook, args, kws in t.getBeforeCommitHooks()]
[('hook', ('4',), {'kw1': '4.1'}),
 ('hook', ('5',), {'kw2': '5.2'})]

And commit also calls them in this order:

>>> t.commit()
>>> len(log)
2
>>> log  #doctest: +NORMALIZE_WHITESPACE
["arg '4' kw1 '4.1' kw2 'no_kw2'",
 "arg '5' kw1 'no_kw1' kw2 '5.2'"]
>>> reset_log()

3   CPS before commit hook extensions

3.1   Simple ordering policy for subscriber execution

The ZODB transaction has an implicit policy dealing with the order of execution of its before commit hooks: it is the order in which they have been registered. Despite the fact that you can register several hooks on the transaction, you have no way to order them. In fact, you can rarely predict in which order the hooks will be registered at runtime.

CPS before commit subsribers manager defines a trivial and sensible ordering policy using an order of registration as an integer value specifying the order level for each subscriber.

For instance, a suberiber registered with order=1 will be invoked after another subscriber registered with order=-1 and before another registered with order=2, regardless of which was registered first. When two subscribers are registered with the same order, the first one registered is called first.

3.2   CPS default subscribers and execution dependency

CPS defines, registers and uses three (3) default susbcribers.

o IndexationManager o TreeCacheManager o EventSubscriptionsManager

They are executed in the following order :

  1. IndexationManager (order -100)
  2. TreeCacheManager (order 0)
  3. EventSubsriptionsManager (order 100)

Remember this if you want to define, register and use your own subscriber (see last section, it contains an example of such a definition).

Let's explain a bit why we choose this order of execution.

It's pretty clear that the EventSubscriptionsManager should be executed after the IndexationManager and the TreeCacheManager since this manager deals with the results of all the events that were sent during the transaction. Thus, the EventSubscriptionsManager can be interested with the events the IndexationManager and the TreeCacheManager can throw on the channel.

The IndexationManager is called first because the TreeCacheManager might have to query the catalog before rebuilding the tree. It's not supposed to be the case on the stock CPS though.

ATTENTION ! An important point here is the fact that the above CPS subscribers are not registered on the ZODB transaction itself but on the CPS before commit subscribers manager. You need to understand that the only before commit hook that CPS is adding on the ZODB transaction itself is the CPS before commmit subscribers manager, because it is the one defining the order of execution (see section 4 to see why the ZODB transaction doesn't deal with order of execution).

3.3   Design

3.3.1   IBeforeCommitSubscriber / BeforeCommitSubscriber

This is the default interface and base class for all the before commit subscribers including the before commit subscribers itself. (see below)

Here is the IBeforeCommitSubscriber interface definition.

>>> import zope.interface
>>> class IBeforeCommitSubscriber(zope.interface.Interface):
...     """Before commit susbscriber interface definition
...
...        Provides a base interface for before hook definitions
...     """
...
...     DEFAULT_SYNC = zope.interface.Attribute('DEFAULT_SYNC',
...     "Default sync mode")
...
...     enabled = zope.interface.Attribute('enable', "Default is True")
...
...     def setSynchronous(sync):
...         """Set queuing mode.
...         """
...
...     def isSynchronous():
...         """Get queuing mode.
...         """
...
...     def __call__():
...         """Called when transaction commits.
...
...         Does the actual manager work.
...         """
...
...     def enable():
...         """Enable the manager
...         """
...
...     def disable():
...         """Disable the manager
...         """

It provides the following behavior:

  • sync mode

    • sync = False (end of the transaction)
    • sync = True (real time)
  • status

    • status = True (enabled)
    • status = False (disabled)
  • __call__()

    • method that does the actual job.

This is what you should be implementing if you want to define your own subscriber to benefit from the CPSCore integration. (sync, status, etc...)

I guess, this part could be shared at the Zope layer. I'll try to sketch a proposal someday on Zope3.

3.3.2   BeforeCommitSubscribersManager

  • What is it?

    A manager for CPS that will be responsible for the execution of ZODB transaction hooks registered by CPS. As well, note that this manager is responsible for the execution of hooks (not the ZODB transaction itself !). This object will be registered as a ZODB before commit hook.

    Multiple subscribers can be registered and will be called in the order they were registered (first registered, first called), except that susbcribers registered with different order arguments are invoked from smallest order value to largest. order must be an integer, and defaults to 0.

  • API of the BeforeCommitSubscribersManager class:

    BeforeCommitSubscribersManager implements
    IBeforeCommitSubscriber (see above) and define the following API
    
    >>> def addSubscriber(self, susbcriber, args=(), kws=None, order=0):
    ...     """Register a subscriber to call before the transaction is
    ...        commited.
    ...
    ...     The specified hook function will be called after the
    ...     transaction's commit method has been called, but before
    ...     the commit process has been started.  The hook will be
    ...     passed the specified positional (`args`) and keyword
    ...     (`kws`) arguments.  `args` is a sequence of positional
    ...     arguments to be passed, defaulting to an empty tuple (no
    ...     positional arguments are passed).  `kws` is a dictionary
    ...     of keyword argument names and values to be passed, or the
    ...     default None (no keyword arguments are passed).
    ...
    ...     Multiple hooks can be registered and will be called in the
    ...     order they were registered (first registered, first
    ...     called), except that hooks registered with different
    ...     `order` arguments are invoked from smallest `order` value
    ...     to largest.  `order` must be an integer, and defaults to
    ...     0.
    ...
    ...     For instance, a hook registered with order=1 will be
    ...     invoked after another hook registered with order=-1 and
    ...     before another registered with order=2, regardless of
    ...     which was registered first.  When two hooks are registered
    ...     with the same order, the first one registered is called
    ...     first.
    ...
    ...     Hooks are called only for a top-level commit.  A
    ...     subtransaction commit or savepoint creation does not call
    ...     any hooks.  If the transaction is aborted, hooks are not
    ...     called, and are discarded.  Calling a hook "consumes" its
    ...     registration too: hook registrations do not persist across
    ...     transactions.  If it's desired to call the same hook on
    ...     every transaction commit, then addSubscriber() must
    ...     be called with that hook during every transaction; in such
    ...     a case consider registering a synchronizer object via a
    ...     TransactionManager's registerSynch() method instead.
    ...     """
    

The CPSCore.commithooks module provides an helper as well creating a BeforeCommitSubscribersManager instance, on the transaction itself, and registers it as a ZODB before commit hook.

>>> from Products.CPSCore.commithooks import \
... get_before_commit_subscribers_manager
>>> txn_manager = get_before_commit_subscribers_manager()

Let's take an example:

>>> log = []
>>> def reset_log():
...     del log[:]

>>> def hook(arg='no_arg', kw1='no_kw1', kw2='no_kw2'):
...     log.append("arg %r kw1 %r kw2 %r" % (arg, kw1, kw2))

>>> from Products.CPSCore.commithooks import \
... get_before_commit_subscribers_manager
>>> mgr = get_before_commit_subscribers_manager()

Register hooks:

>>> mgr.addSubscriber(hook, '1', order=0)
>>> mgr.addSubscriber(hook, '2', order=-999999)
>>> mgr.addSubscriber(hook, '3', order=999999)
>>> mgr.addSubscriber(hook, '4', order=0)
>>> mgr.addSubscriber(hook, '5', order=999999)
>>> mgr.addSubscriber(hook, '6', order=-999999)
>>> mgr.addSubscriber(hook, '7', order=0)

Execute:

>>> mgr()

The hooks are executed in the order of execution:

>>> log
["arg '2' kw1 'no_kw1' kw2 'no_kw2'", "arg '6' kw1 'no_kw1' kw2 'no_kw2'", "arg '1' kw1 'no_kw1' kw2 'no_kw2'", "arg '4' kw1 'no_kw1' kw2 'no_kw2'", "arg '7' kw1 'no_kw1' kw2 'no_kw2'", "arg '3' kw1 'no_kw1' kw2 'no_kw2'", "arg '5' kw1 'no_kw1' kw2 'no_kw2'"]

>>> reset_log()

3.3.3   IndexationManager

o Goal

During a transaction we want always a single indexation per object, including the security index update, no matter what appends during this transaction. We assume third party code can request indexation itself and we want the same result in this case too. ONE SINGLE INDEXATION PER TRANSACTION NO MATTER WHAT APPENDS.

o Introduction

Manager for indexation that can be delayed until commit time. Order of execution -100. This is the first hook called from the CPS default ones within a stock CPS instance.

o Get the indexation manager

>>> from Products.CPSCore.IndexationManager import \
... get_indexation_manager
>>> mgr = get_indexation_manager()

o Public API

>>> def push(self, ob, idxs=None, with_security=False):
...     """Add / update an object to reindex to the reindexing queue.
...
...     Copes with security reindexation as well.
...     """

You may find interesting to check the module unit tests over there for practical examples : CPSCore/tests/test_indexation_manager.py

o CPS Integration

This manager required us to change a bit our infrastructure. CPS proxies inherits from ProxyBase.ProxyBase and redefine the following CMFCore.CMFCatalogAware methods reindexObject() and reindexObjectSecurity().

We changed the ProxyBase.reindexObject() method this way:

>>> def reindexObject(self, idxs=[]):
...     get_indexation_manager().push(self, idxs=idxs)
>>>

And the reindexObjectSecurity() one:

>>> def reindexObjectSecurity(self):
...     get_indexation_manager().push(self, with_security=True)
>>>

Both methods are now just invoking the indexation manager to schedule their own indexation.

The methods doing the actual reindexation are now private on the ProxyBase class:

def _reindexObjectSecurity(self, skip_self=False):
    # Implementation

def reindexObject(self, idxs=[]):
    # Implementation

The Indexation Manager will call this method when processing objects that have been pushed for indexation.

3.3.4   TreeCacheManager

  • Goal

    We want only one single tree cache rebuilt per object per transaction.

  • Introduction

    CPS defines a Trees Tool, that caches some information about the site's hierarchies for evident performance issues. Trees Tool updates are realized after some event have been sent on the CPS event channel. The events can be thrown several times with different informations.

  • CPSCore/TreeCacheManager

    Manager for tree cache that can be delayed until commit time. Order of execution:

    CPSCore/TreeCacheManager.py
    CPSCore/tests/test_treecache_manager.py
    

XXX to finish

3.4   EventSubscriptionsManager (CPSSubscriptions)

Manager for events with a delayed processing until commit time Order of execution:

CPSSubscriptions/EventSubscriptionsManager.py
CPSSubscriptions/test_event_subscriptions_manager.py

XXX to finish

4   FAQ

4.1   Which version of ZODB do I have to use?

The implementation of the transaction before commit hook as described above is included within the release 3.6 of ZODB.

Zope-2.8.x has ZODB 3.4.x shipped within but CPS provides a compatibility product adding the 3.6 implementation to 3.4. See CPSCompat/trunk/PatchZODBTransaction.py.

You can thus benefit from this extension using latest CPS-3.3.x with Zope-2.8.x.

Why is the before commit hook ordering policy not part of the ZODB core transaction? -------------------------------------------------------------------

Why is the trivial before commit hook subscribers ordering policy, that CPS is providing, not part of the ZODB core transaction?

Hum,... You may wonder ? An endless discussion occurred on the zodb-dev mailing list. You may Check this thread if you're interested:

http://mail.zope.org/pipermail/zodb-dev/2005-August/009090.html

We should be able to propose having a part of the CPS extension at the Zope layer for better code reuse across frameworks on top of Zope.

4.2   How can I have back a real time indexation within my tests?

You can change the SYNC status of a subscribers to True:

>>> from Products.CPSCore.IndexationManager import get_indexation_manager
>>> get_indexation_manager().setSynchronous(True)

After this the indexation will behave as if no subscriber had been registered for the indexation.

Note, you can do this for all all the subscribers implementing IBeforeCommitSubscriber

4.3   How can I deactivate a given transaction subscriber?

You may disable for a certain time, within the transaction, a subscriber.

For instance, how to deactivate the indexation during a period of the transaction:

>>> from Products.CPSCore.IndexationManager import get_indexation_manager
>>> get_indexation_manager().disable()

Here no object will be scheduled for indexation anymore. It means, the indexation that should have occurred will not be during this time:

>>> get_indexation_manager().enable()

Here, the subscriber will queue back objects and processed them for indexation.

4.4   How can I define, register and use my own subscriber?

You can of course define and register your own subscribers.

Define it. It implements IBeforeCommitSubscriber:

>>> from Products.CPSCore.interfaces import IBeforeCommitSubscriber
>>> from Products.CPSCore.commithooks import BeforeCommitSubscriber
>>> import zope.interface

This manager will be executed just before the IndexationManager. (Check the section 3 for the CPS defined orders) It's order will be -101:

>>> _TXN_MGR_ORDER = -101

>>> class MessageManager(BeforeCommitSubscriber):
...     zope.interface.implements(IBeforeCommitSubscriber)
...
...     def __init__(self, mgr):
...         """Initialize and register this manager within the
...         transaction.
...         Here, we specified the order of this subscriber
...         """
...         BeforeCommitSubscriber.__init__(self, mgr, order=_TXN_MGR_ORDER)
...         self._queue = []
...
...     def push(self, str_):
...         """Push the string within the queue."""
...         if not self.enabled:
...             return
...         if self._sync:
...             print str_
...         if str_ not in self._queue:
...             self._queue.append(str_)
...
...     def __call__(self):
...         # do the actual async job
...         print ' '.join(self._queue)
>>>

As we saw in section3 , the CPS BeforeCommitSubscribersManager is responsible of the CPS before commit hooks execution. Thus we need to register the MessageManager on the CPS commithooks:

>>> from Products.CPSCore.commithooks import \
... get_before_commit_subscribers_manager

As well, we need to create an instance of the Manager at some point. We choose to store it on the transaction itself. Don't forget it's a non persistent object. We will store this instance under the following id:

>>> _TXN_MY_MANAGER = '_cps_my_manager'

We need to define a method that will register this hook against the CPS transaction manager and create an instance of the Manager on the transaction itself:

>>> import transaction
>>> def get_message_manager():
...     """Get the  manager.
...
...     Creates it *if* needed.
...     """
...     txn = transaction.get()
...     mgr = getattr(txn, _TXN_MY_MANAGER, None)
...     if mgr is None:
...         mgr = MessageManager(get_before_commit_subscribers_manager())
...         setattr(txn, _TXN_MY_MANAGER, mgr)
...         return mgr
>>>

Now, you can use it within your code. To get the manager let's use the helper function:

>>> txn_manager = get_message_manager()
>>> isinstance(txn_manager, MessageManager)
True

You can then push strings within your manager:

>>> txn_manager.push('CPS')
>>> txn_manager.push('rocks')

You can deactivate the manager during the transaction:

>>> txn_manager.disable()
>>> txn_manager.push('sucks')

You can activate it back and then continue using it:

>>> txn_manager.enable()
>>> txn_manager.push('!')

Then when the transaction will be committed the manager will be executed:

>>> transaction.commit()
CPS rocks !

You can use the Manager in real time:

>>> txn_manager = get_message_manager()

To change the sync mode use this API:

>>> txn_manager.setSynchronous(True)
<BLANKLINE>

Then pushing display the result directly:

>>> txn_manager.push('CPS is great !')
CPS is great !

4.5   Can I launch non transactional operations using before commit hook subscribers?

You cannot execute non transactional code using before commit hook subscribers since the execution of the subscribers are done at the end of the transaction but not after the transaction. ZODB should be supporting another API soon for this kind of use case. Think about committing on a non transactional RDF database for instance.