Logo Search packages:      
Sourcecode: magicicada version File versions  Download package

def magicicada::tests::test_syncdaemon::MetaQueueChangedTests::test_mq_polling_untilfinish (   self  ) 

Check that it polls mq until no more is needed.

Definition at line 420 of file test_syncdaemon.py.

00420                                          :
        """Check that it polls mq until no more is needed."""
        d2 = dict(name='QUEUE_MANAGER', queues='WORKING_ON_CONTENT',
                  description='description', is_error='', is_connected='True',
                  is_online='', connection='conn')

        # set the callback, and adjust the polling time to faster
        calls = []
        def fake_get(*a):
            """Fake get."""
            calls.append(None)
            if len(calls) < 3:
                pass  # no changes, should keep calling
            elif len(calls) == 3:
                self.sd.on_sd_status_changed('QUEUE_MANAGER', 'description',
                                             False, True, False,
                                             'WORKING_ON_CONTENT', 'connect')

                # allow time to see if a mistaken call happens
                reactor.callLater(.5, deferred.callback, True)
            else:
                deferred.errback(ValueError("Too many calls"))
            return defer.succeed("foo")

        self.sd.dbus.get_meta_queue = fake_get
        self.sd._mq_poll_time = .1

        self.sd.on_sd_status_changed('QUEUE_MANAGER', 'description', False,
                                     True, False, 'WORKING_ON_BOTH',
                                     'connection')
        deferred = defer.Deferred()
        return deferred

    def test_MQ_state_nothing(self):


Generated by  Doxygen 1.6.0   Back to index