Logo Search packages:      
Sourcecode: magicicada version File versions  Download package

test_syncdaemon.py

# Tests for the SyncDaemon interface
#
# Author: Facundo Batista <facundo@taniquetil.com.ar>
#
# Copyright 2010 Chicharreros
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Tests for the SyncDaemon communication backend."""

import logging
import unittest

from magicicada.syncdaemon import SyncDaemon, State
from magicicada.tests.helpers import MementoHandler

from twisted.trial.unittest import TestCase as TwistedTestCase
from twisted.internet import defer, reactor


00031 class FakeDBusInterface(object):
    """Fake DBus Interface, for SD to not use dbus at all during tests."""

    fake_sd_started = False

    def __init__(self, sd):
        pass

    def shutdown(self):
        pass

00042     def get_status(self):
        """Fake status."""
        return defer.succeed(('fakename', 'fakedescrip', False, True,
                              False, 'fakequeues', 'fakeconnection'))
00046     def get_folders(self):
        """Fake folders."""
        return defer.succeed('fakedata')
    get_content_queue = get_meta_queue = get_folders
    start = quit = connect = disconnect = get_folders
    get_shares_to_me = get_shares_to_others = get_folders

00053     def is_sd_started(self):
        """Fake response."""
        return self.fake_sd_started


00058 class BaseTest(TwistedTestCase):
    """Base test with a SD."""

    timeout = 1

00063     def setUp(self):
        """Set up."""
        self.sd = SyncDaemon(FakeDBusInterface)

00067     def tearDown(self):
        """Tear down."""
        self.sd.shutdown()


00072 class InitialDataTests(unittest.TestCase):
    """Tests for initial data gathering."""

00075     def test_called_by_start(self):
        """Check that start calls get initial data."""
        sd = SyncDaemon(FakeDBusInterface)
        called = []
        sd._get_initial_data = lambda: called.append(True)
        sd.start()
        self.assertTrue(called)

00083     def test_called_by_nameownerchanged_no(self):
        """Check that it is called when discover that sd started."""
        sd = SyncDaemon(FakeDBusInterface)
        called = []
        sd._get_initial_data = lambda: called.append(True)
        sd.on_sd_name_owner_changed(True)
        self.assertTrue(called)

00091     def test_called_by_nameownerchanged_yes(self):
        """Check that it is not called when discover that sd stopped."""
        sd = SyncDaemon(FakeDBusInterface)
        called = []
        sd._get_initial_data = lambda: called.append(True)
        sd.on_sd_name_owner_changed(False)
        self.assertFalse(called)

00099     def test_called_beggining_no(self):
        """Check that it should not be called if no SD."""
        called = []
        orig_met = SyncDaemon._get_initial_data
        SyncDaemon._get_initial_data = lambda s: called.append(True)
        SyncDaemon(FakeDBusInterface)
        SyncDaemon._get_initial_data = orig_met
        self.assertFalse(called)

00108     def test_called_beggining_yes(self):
        """Check that it should be called if SD already started."""
        called = []
        orig_met = SyncDaemon._get_initial_data
        SyncDaemon._get_initial_data = lambda s: called.append(True)
        FakeDBusInterface.fake_sd_started = True
        SyncDaemon(FakeDBusInterface)
        SyncDaemon._get_initial_data = orig_met
        self.assertTrue(called)

00118     def test_calls_callbacks(self):
        """Check that initial data calls the callbacks for new data."""
        called = []
        sd = SyncDaemon(FakeDBusInterface)
        f = lambda *a: called.append(True)
        sd.status_changed_callback = f
        sd.content_queue_changed_callback = f
        sd.meta_queue_changed_callback = f

        sd._get_initial_data()
        self.assertEqual(len(called), 3)


00131 class StatusChangedTests(BaseTest):
    """Simple signals checking."""

    @defer.inlineCallbacks
00135     def test_initial_value(self):
        """Fills the status info initially."""
        called = []
        def fake():
            """Fake method."""
            called.append(True)
            return defer.succeed(('fakename', 'fakedescrip', False, True,
                              False, 'fakequeues', 'fakeconnection'))

        self.sd.dbus.get_status = fake
        yield self.sd._get_initial_data()
        self.assertTrue(called)

00148     def test_statuschanged(self):
        """Test StatusChanged signal."""
        deferred = defer.Deferred()

        def callback(name, description, is_error, is_connected, is_online,
                     queues, connection):
            """Check received data."""
            self.assertEqual(name, 'name')
            self.assertEqual(description, 'description')
            self.assertEqual(is_error, False)
            self.assertEqual(is_connected, True)
            self.assertEqual(is_online, False)
            self.assertEqual(queues, 'queues')
            self.assertEqual(connection, 'connection')
            deferred.callback(True)

        self.sd.status_changed_callback = callback
        self.sd.on_sd_status_changed('name', 'description', False, True,
                                     False, 'queues', 'connection')
        return deferred

00169     def test_status_changed_affects_cuurent_status(self):
        """Makes changes to see how status are reflected."""
        # one set of values
        self.sd.on_sd_status_changed('name1', 'description1', False, True,
                                     False, 'queues1', 'connection1')
        self.assertEqual(self.sd.current_state.name, 'name1')
        self.assertEqual(self.sd.current_state.description, 'description1')
        self.assertEqual(self.sd.current_state.is_error, False)
        self.assertEqual(self.sd.current_state.is_connected, True)
        self.assertEqual(self.sd.current_state.is_online, False)
        self.assertEqual(self.sd.current_state.queues, 'queues1')
        self.assertEqual(self.sd.current_state.connection, 'connection1')

        # again, to be sure they actually are updated
        self.sd.on_sd_status_changed('name2', 'description2', True, False,
                                     True, 'queues2', 'connection2')
        self.assertEqual(self.sd.current_state.name, 'name2')
        self.assertEqual(self.sd.current_state.description, 'description2')
        self.assertEqual(self.sd.current_state.is_error, True)
        self.assertEqual(self.sd.current_state.is_connected, False)
        self.assertEqual(self.sd.current_state.is_online, True)
        self.assertEqual(self.sd.current_state.queues, 'queues2')
        self.assertEqual(self.sd.current_state.connection, 'connection2')

00193     def test_is_started_fixed_at_init_no(self):
        """Status.is_started is set at init time, to no."""
        FakeDBusInterface.fake_sd_started = False
        sd = SyncDaemon(FakeDBusInterface)
        self.assertFalse(sd.current_state.is_started)

00199     def test_is_started_fixed_at_init_yes(self):
        """Status.is_started is set at init time, to yes."""
        FakeDBusInterface.fake_sd_started = True
        sd = SyncDaemon(FakeDBusInterface)
        self.assertTrue(sd.current_state.is_started)

00205     def test_on_stopped(self):
        """Stopped affects the status."""
        self.sd.on_sd_name_owner_changed(False)
        self.assertEqual(self.sd.current_state.name, 'STOPPED')
        self.assertEqual(self.sd.current_state.description,
                         'ubuntuone-client is stopped')
        self.assertEqual(self.sd.current_state.is_error, False)
        self.assertEqual(self.sd.current_state.is_connected, False)
        self.assertEqual(self.sd.current_state.is_online, False)
        self.assertEqual(self.sd.current_state.queues, '')
        self.assertEqual(self.sd.current_state.connection, '')

00217     def test_on_started(self):
        """Started affects the status."""
        # make _get_initial_data dummy to not affect test
        self.sd._get_initial_data = lambda: None

        self.sd.on_sd_name_owner_changed(True)
        self.assertEqual(self.sd.current_state.name, 'STARTED')
        self.assertEqual(self.sd.current_state.description,
                         'ubuntuone-client just started')
        self.assertEqual(self.sd.current_state.is_error, False)
        self.assertEqual(self.sd.current_state.is_connected, False)
        self.assertEqual(self.sd.current_state.is_online, False)
        self.assertEqual(self.sd.current_state.queues, '')
        self.assertEqual(self.sd.current_state.connection, '')


00233 class ContentQueueChangedTests(BaseTest):
    """Check the ContenQueueChanged handling."""

    @defer.inlineCallbacks
00237     def test_initial_value(self):
        """Fills the content queue info initially."""
        called = []
        self.sd.dbus.get_content_queue = lambda: called.append(True)
        yield self.sd._get_initial_data()
        self.assertTrue(called)

00244     def test_without_setting_callback(self):
        """It should work even if not hooking into the callback."""
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['foo'])
        self.sd.on_sd_content_queue_changed()

00249     def test_callback_call(self):
        """Call the callback."""
        called = []
        self.sd.content_queue_changed_callback = lambda cq: called.append(cq)
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['foo'])
        self.sd.on_sd_content_queue_changed()
        self.assertTrue(called)

00257     def test_callback_call_twice_different(self):
        """Call the callback twice for different info."""
        called = []
        self.sd.content_queue_changed_callback = lambda cq: called.append(cq)

        # first call
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['foo'])
        self.sd.on_sd_content_queue_changed()
        self.assertEqual(called, [['foo']])

        # second call, different info
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['bar'])
        self.sd.on_sd_content_queue_changed()
        self.assertEqual(called, [['foo'], ['bar']])

00272     def test_callback_call_twice_same(self):
        """Call the callback once, even getting twice the same info."""
        called = []
        self.sd.content_queue_changed_callback = lambda cq: called.append(cq)

        # first call
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['foo'])
        self.sd.on_sd_content_queue_changed()
        self.assertEqual(called, [['foo']])

        # second call, same info
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['foo'])
        self.sd.on_sd_content_queue_changed()
        self.assertEqual(called, [['foo']])

00287     def test_CQ_state_nothing(self):
        """Check the ContentQueue info, being nothing."""
        self.sd.dbus.get_content_queue = lambda: defer.succeed([])
        self.sd.on_sd_content_queue_changed()
        self.assertEqual(self.sd.content_queue, [])

00293     def test_CQ_state_one(self):
        """Check the ContentQueue info, being one."""
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['foo'])
        self.sd.on_sd_content_queue_changed()
        self.assertEqual(self.sd.content_queue, ['foo'])

00299     def test_CQ_state_two(self):
        """Check the ContentQueue info, two."""
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['foo', 'bar'])
        self.sd.on_sd_content_queue_changed()
        self.assertEqual(self.sd.content_queue, ['foo', 'bar'])


00306 class MetaQueueChangedTests(BaseTest):
    """Check the MetaQueueChanged handling."""

    timeout = 3

00311     def setUp(self):
        """Set up."""
        BaseTest.setUp(self)
        self.sd.current_state._set(queues='WORKING_ON_METADATA')

    @defer.inlineCallbacks
00317     def test_initial_value(self):
        """Fills the meta queue info initially."""
        called = []
        self.sd.dbus.get_meta_queue = lambda: called.append(True)
        yield self.sd._get_initial_data()
        self.assertTrue(called)

00324     def test_callback_call(self):
        """Call the callback."""
        called = []
        self.sd.meta_queue_changed_callback = lambda mq: called.append(mq)
        self.sd.dbus.get_meta_queue = lambda: defer.succeed(['foo'])
        self.sd._check_mq()
        self.assertTrue(called)

00332     def test_callback_call_twice_different(self):
        """Call the callback twice for different info."""
        called = []
        self.sd.meta_queue_changed_callback = lambda mq: called.append(mq)

        # first call
        self.sd.dbus.get_meta_queue = lambda: defer.succeed(['foo'])
        self.sd._check_mq()
        self.assertEqual(called, [['foo']])

        # second call, different info
        self.sd.dbus.get_meta_queue = lambda: defer.succeed(['bar'])
        self.sd._check_mq()
        self.assertEqual(called, [['foo'], ['bar']])

00347     def test_callback_call_twice_same(self):
        """Call the callback once, even getting twice the same info."""
        called = []
        self.sd.meta_queue_changed_callback = lambda mq: called.append(mq)

        # first call
        self.sd.dbus.get_meta_queue = lambda: defer.succeed(['foo'])
        self.sd._check_mq()
        self.assertEqual(called, [['foo']])

        # second call, same info
        self.sd.dbus.get_meta_queue = lambda: defer.succeed(['foo'])
        self.sd._check_mq()
        self.assertEqual(called, [['foo']])

00362     def test_polling_initiated_state_changed(self):
        """The polling initiates when state changes."""
        called = []
        self.sd._check_mq = lambda: called.append(True)

        # send some status changed
        self.sd.on_sd_status_changed('name', 'description', False, True,
                                     False, 'queues', 'connection')
        self.assertTrue(called)

00372     def test_mq_polling_workinginmetadata_notinqueuemanager(self):
        """Check that it polls mq while working in metadata not being in QM."""
        # set the callback
        deferred = defer.Deferred()
        def fake():
            """Fake."""
            deferred.callback(True)
            return defer.succeed("foo")
        self.sd.dbus.get_meta_queue = fake

        # send status changed to working in metadata
        self.sd.on_sd_status_changed('AUTHENTICATE', 'description', False,
                                     True, False, 'WORKING_ON_METADATA',
                                     'connection')
        return deferred

00388     def test_mq_polling_workinginmetadata_queuemanager(self):
        """Check that it polls mq while working in metadata being in QM."""
        # set the callback
        deferred = defer.Deferred()
        def fake():
            """Fake."""
            deferred.callback(True)
            return defer.succeed("foo")
        self.sd.dbus.get_meta_queue = fake

        # send status changed to working in metadata
        self.sd.on_sd_status_changed('QUEUE_MANAGER', 'description', False,
                                     True, False, 'WORKING_ON_METADATA',
                                     'connection')
        return deferred

00404     def test_mq_polling_workinginboth(self):
        """Check that it polls mq while working in both."""
        # set the callback
        deferred = defer.Deferred()
        def fake():
            """Fake."""
            deferred.callback(True)
            return defer.succeed("foo")
        self.sd.dbus.get_meta_queue = fake

        # send status changed to working in metadata
        self.sd.on_sd_status_changed('QUEUE_MANAGER', 'description', False,
                                     True, False, 'WORKING_ON_BOTH',
                                     'connection')
        return deferred

00420     def test_mq_polling_untilfinish(self):
        """Check that it polls mq until no more is needed."""
        d2 = dict(name='QUEUE_MANAGER', queues='WORKING_ON_CONTENT',
                  description='description', is_error='', is_connected='True',
                  is_online='', connection='conn')

        # set the callback, and adjust the polling time to faster
        calls = []
        def fake_get(*a):
            """Fake get."""
            calls.append(None)
            if len(calls) < 3:
                pass  # no changes, should keep calling
            elif len(calls) == 3:
                self.sd.on_sd_status_changed('QUEUE_MANAGER', 'description',
                                             False, True, False,
                                             'WORKING_ON_CONTENT', 'connect')

                # allow time to see if a mistaken call happens
                reactor.callLater(.5, deferred.callback, True)
            else:
                deferred.errback(ValueError("Too many calls"))
            return defer.succeed("foo")

        self.sd.dbus.get_meta_queue = fake_get
        self.sd._mq_poll_time = .1

        self.sd.on_sd_status_changed('QUEUE_MANAGER', 'description', False,
                                     True, False, 'WORKING_ON_BOTH',
                                     'connection')
        deferred = defer.Deferred()
        return deferred

00453     def test_MQ_state_nothing(self):
        """Check the MetaQueue info, being nothing."""
        self.sd.dbus.get_meta_queue = lambda: defer.succeed([])
        self.sd._check_mq()
        self.assertEqual(self.sd.meta_queue, [])

00459     def test_MQ_state_one(self):
        """Check the MetaQueue info, being one."""
        self.sd.dbus.get_meta_queue = lambda: defer.succeed(['foo'])
        self.sd._check_mq()
        self.assertEqual(self.sd.meta_queue, ['foo'])

00465     def test_MQ_state_two(self):
        """Check the MetaQueue info, two."""
        self.sd.dbus.get_meta_queue = lambda: defer.succeed(['foo', 'bar'])
        self.sd._check_mq()
        self.assertEqual(self.sd.meta_queue, ['foo', 'bar'])


00472 class StateTests(unittest.TestCase):
    """Test State class."""

00475     def test_initial(self):
        """Initial state for vals."""
        st = State()
        self.assertEqual(st.name, '')

00480     def test_set_one_value(self):
        """Set one value."""
        st = State()
        st._set(name=55)

        # check the one is set, the rest not
        self.assertEqual(st.name, 55)
        self.assertEqual(st.description, '')

00489     def test_set_two_values(self):
        """Set two values."""
        st = State()
        st._set(name=55, description=77)

        # check those two are set, the rest not
        self.assertEqual(st.name, 55)
        self.assertEqual(st.description, 77)
        self.assertFalse(st.is_error)

00499     def test_bad_value(self):
        """Set a value that should not."""
        st = State()
        self.assertRaises(AttributeError, st._set, not_really_allowed=44)


00505 class APITests(unittest.TestCase):
    """Check exposed methods and attributes."""

00508     def setUp(self):
        """Set up the test."""
        self.sd = SyncDaemon(FakeDBusInterface)

        self._replaced = None
        self.called = False

00515     def tearDown(self):
        """Tear down the test."""
        if self._replaced is not None:
            setattr(*self._replaced)
        self.sd.shutdown()

00521     def mpatch_called(self, obj, method_name):
        """Monkeypatch to flag called."""
        self._replaced = (obj, method_name, getattr(obj, method_name))
        f = lambda *a, **k: setattr(self, 'called', True)
        setattr(obj, method_name, f)

00527     def flag_called(self, obj, method_name):
        """Replace callback to flag called."""
        f = lambda *a, **k: setattr(self, 'called', True)
        setattr(obj, method_name, f)

00532     def test_is_started_yes(self):
        """Check is_started, True."""
        # simulate the signal that indicates the name was registered
        self.sd.on_sd_name_owner_changed(True)
        self.assertTrue(self.sd.current_state.is_started)

00538     def test_is_started_no(self):
        """Check is_started, False."""
        self.sd.on_sd_name_owner_changed(False)
        self.assertFalse(self.sd.current_state.is_started)

00543     def test_start(self):
        """Test start calls SD."""
        self.mpatch_called(self.sd.dbus, 'start')
        self.sd.start()
        self.assertTrue(self.called)

00549     def test_quit(self):
        """Test quit calls SD."""
        self.mpatch_called(self.sd.dbus, 'quit')
        self.sd.quit()
        self.assertTrue(self.called)

00555     def test_connect(self):
        """Test connect calls SD."""
        self.mpatch_called(self.sd.dbus, 'connect')
        self.sd.connect()
        self.assertTrue(self.called)

00561     def test_disconnect(self):
        """Test disconnect calls SD."""
        self.mpatch_called(self.sd.dbus, 'disconnect')
        self.sd.disconnect()
        self.assertTrue(self.called)

00567     def test_on_started(self):
        """Called when SD started."""
        self.flag_called(self.sd, 'on_started_callback')
        self.sd.on_sd_name_owner_changed(True)
        self.assertTrue(self.called)

00573     def test_on_stopped(self):
        """Called when SD stopped."""
        self.flag_called(self.sd, 'on_stopped_callback')
        self.sd.on_sd_name_owner_changed(False)
        self.assertTrue(self.called)

00579     def test_on_connected(self):
        """Called when SD connected."""
        self.flag_called(self.sd, 'on_connected_callback')

        # first signal with connected in True
        self.sd.on_sd_status_changed('name', 'description', False, True,
                                     False, 'queues', 'connection')
        self.assertTrue(self.called)

00588     def test_on_disconnected(self):
        """Called when SD disconnected."""
        self.flag_called(self.sd, 'on_disconnected_callback')

        # connect and disconnect
        self.sd.on_sd_status_changed('name', 'description', False, True,
                                     False, 'queues', 'connection')
        self.sd.on_sd_status_changed('name', 'description', False, False,
                                     False, 'queues', 'connection')
        self.assertTrue(self.called)

00599     def test_on_online(self):
        """Called when SD goes online."""
        self.flag_called(self.sd, 'on_online_callback')

        # first signal with online in True
        self.sd.on_sd_status_changed('name', 'description', False, True,
                                     True, 'queues', 'connection')
        self.assertTrue(self.called)

00608     def test_on_offline(self):
        """Called when SD goes offline."""
        self.flag_called(self.sd, 'on_offline_callback')

        # go online and then offline
        self.sd.on_sd_status_changed('name', 'description', False, True,
                                     True, 'queues', 'connection')
        self.sd.on_sd_status_changed('name', 'description', False, True,
                                     False, 'queues', 'connection')
        self.assertTrue(self.called)



00621 class TestLogs(unittest.TestCase):
    """Test logging."""

00624     def setUp(self):
        """Set up."""
        self.hdlr = MementoHandler()
        logging.getLogger('magicicada.syncdaemon').addHandler(self.hdlr)
        self.hdlr.setLevel(logging.DEBUG)
        self.sd = SyncDaemon(FakeDBusInterface)

00631     def tearDown(self):
        """Shut down!"""
        self.sd.shutdown()

00635     def test_instancing(self):
        """Just logged SD instancing."""
        self.assertTrue(self.hdlr.check_inf("SyncDaemon interface started!"))

00639     def test_shutdown(self):
        """Log when SD shutdowns."""
        self.sd.shutdown()
        self.assertTrue(self.hdlr.check_inf("SyncDaemon interface going down"))

    @defer.inlineCallbacks
00645     def test_initial_value(self):
        """Log the initial filling."""
        yield self.sd._get_initial_data()
        self.assertTrue(self.hdlr.check_inf("Getting initial data"))

00650     def test_start(self):
        """Log the call to start."""
        self.sd.start()
        self.assertTrue(self.hdlr.check_inf("Starting u1.SD"))

00655     def test_quit(self):
        """Log the call to quit."""
        self.sd.quit()
        self.assertTrue(self.hdlr.check_inf("Stopping u1.SD"))

00660     def test_connect(self):
        """Log the call to connect."""
        self.sd.connect()
        self.assertTrue(self.hdlr.check_inf("Telling u1.SD to connect"))

00665     def test_disconnect(self):
        """Log the call to disconnect."""
        self.sd.disconnect()
        self.assertTrue(self.hdlr.check_inf("Telling u1.SD to disconnect"))

00670     def test_check_mq_true(self):
        """Log the MQ check when it asks for info."""
        self.sd.current_state._set(name='QUEUE_MANAGER',
                                   queues='WORKING_ON_METADATA')
        self.sd._check_mq()
        self.assertTrue(self.hdlr.check_inf("Asking for MQ information"))

00677     def test_check_mq_noreally(self):
        """Log the MQ check when it should not work."""
        self.sd.current_state._set(name='QUEUE_MANAGER',
                                   queues='WORKING_ON_CONTENT')
        self.sd._check_mq()
        self.assertTrue(self.hdlr.check_inf(
                        "Check MQ called but States not in MQ"))

00685     def test_meta_queue_changed(self):
        """Log that MQ has new data."""
        self.sd.dbus.get_meta_queue = lambda: defer.succeed(['foo'])
        self.sd.current_state._set(name='QUEUE_MANAGER',
                                   queues='WORKING_ON_METADATA')
        self.sd._check_mq()
        self.assertTrue(self.hdlr.check_inf("SD Meta Queue changed: 1 items"))

00693     def test_content_queue_changed(self):
        """Log that process_cq has new data."""
        self.sd.dbus.get_content_queue = lambda: defer.succeed(['foo'])
        self.sd.on_sd_content_queue_changed()
        self.assertTrue(self.hdlr.check_inf("SD Content Queue changed"))
        self.assertTrue(self.hdlr.check_inf(
                        "Content Queue info is new! 1 items"))

00701     def test_on_status_changed(self):
        """Log status changed."""
        self.sd.on_sd_status_changed('name', 'description', False, True,
                                     False, 'queues', 'connection')
        self.assertTrue(self.hdlr.check_inf("SD Status changed"))
        self.assertTrue(self.hdlr.check_dbg("    new status: name=u'name', "
            "description=u'description', is_error=False, is_connected=True, "
            "is_online=False, queues=u'queues', connection=u'connection'"))

00710     def test_folders_changed(self):
        """Log when folders changed."""
        self.sd.on_sd_folders_changed()
        self.assertTrue(self.hdlr.check_inf("SD Folders changed"))

00715     def test_shares_changed(self):
        """Log when shares changed."""
        self.sd.on_sd_shares_changed()
        self.assertTrue(self.hdlr.check_inf("SD Shares changed"))

00720     def test_on_name_owner_changed(self):
        """Log name owner changed."""
        self.sd.on_sd_name_owner_changed(True)
        self.assertTrue(self.hdlr.check_inf("SD Name Owner changed: True"))


00726 class FoldersTests(BaseTest):
    """Folders checking."""

00729     def test_foldercreated_callback(self):
        """Gets the new data after the folders changed."""
        # set the callback
        called = []
        self.sd.dbus.get_folders = lambda: called.append(True)

        # they changed!
        self.sd.on_sd_folders_changed()

        # test
        self.assertTrue(called)

    @defer.inlineCallbacks
00742     def test_initial_value(self):
        """Fills the folder info initially."""
        called = []
        self.sd.dbus.get_folders = lambda: called.append(True)
        yield self.sd._get_initial_data()
        self.assertTrue(called)

00749     def test_folder_changed_callback(self):
        """Test that the GUI callback is called."""
        # set the callback
        called = []
        self.sd.on_folders_changed_callback = lambda *a: called.append(True)

        # they changed!
        self.sd.on_sd_folders_changed()

        # test
        self.assertTrue(called)


00762 class SharesTests(BaseTest):
    """Shares checking."""

00765     def test_shares_changed_callback(self):
        """Gets the new data after the shares changed."""
        # set the callback
        called = []
        self.sd.dbus.get_shares_to_me = lambda: called.append(True)
        self.sd.dbus.get_shares_to_others = lambda: called.append(True)

        # they changed!
        self.sd.on_sd_shares_changed()

        # test
        self.assertEqual(len(called), 2)

    @defer.inlineCallbacks
00779     def test_initial_value(self):
        """Fills the folder info initially."""
        called = []
        self.sd.dbus.get_shares_to_me = lambda: called.append(True)
        self.sd.dbus.get_shares_to_others = lambda: called.append(True)
        yield self.sd._get_initial_data()
        self.assertEqual(len(called), 2)

00787     def test_shares_to_me_changed_callback(self):
        """Test that the GUI callback is called."""
        # set the callback
        cal = []
        self.sd.on_shares_to_me_changed_callback = lambda *a: cal.append(1)
        self.sd.on_shares_to_others_changed_callback = lambda *a: cal.append(2)

        # they changed!
        self.sd.shares_to_me = 'foo'
        self.sd.shares_to_others = 'fakedata' # what fake dbus will return
        self.sd.on_sd_shares_changed()

        # test
        self.assertEqual(cal, [1])

00802     def test_shares_to_others_changed_callback(self):
        """Test that the GUI callback is called."""
        # set the callback
        cal = []
        self.sd.on_shares_to_me_changed_callback = lambda *a: cal.append(1)
        self.sd.on_shares_to_others_changed_callback = lambda *a: cal.append(2)

        # they changed!
        self.sd.shares_to_others = 'foo'
        self.sd.shares_to_me = 'fakedata' # what fake dbus will return
        self.sd.on_sd_shares_changed()

        # test
        self.assertEqual(cal, [2])

Generated by  Doxygen 1.6.0   Back to index