Logo Search packages:      
Sourcecode: magicicada version File versions  Download package

test_dbusiface.py

# Tests for the DBus interface
#
# Author: Facundo Batista <facundo@taniquetil.com.ar>
#
# Copyright 2010 Chicharreros
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Tests for the DBus interce towards real syncdaemon."""

import logging

import dbus
from twisted.trial.unittest import TestCase as TwistedTestCase
from twisted.internet import defer

from magicicada import dbusiface
from magicicada.tests.helpers import MementoHandler


00031 class FakeSessionBus(object):
    """Fake Session Bus."""
    def __init__(self, **kwargs):
        self._callbacks = {}
        self.fake_name_owner = "foo"

00037     def add_signal_receiver(self, method, dbus_interface, signal_name):
        """Add a signal receiver."""
        self._callbacks[(dbus_interface, signal_name)] = method

00041     def remove_signal_receiver(self, match, dbus_interface, signal_name):
        """Remove the signal receiver."""
        del self._callbacks[(dbus_interface, signal_name)]

00045     def get_name_owner(self, name):
        """Fakes the response of the method."""
        assert name == 'com.ubuntuone.SyncDaemon'
        if isinstance(self.fake_name_owner, str):
            return self.fake_name_owner
        else:
            raise self.fake_name_owner


00054 class CallLoguer(object):
    """Class that logs the methods called."""
    def __init__(self):
        self._called_method = None, ()
        self._fake_response = None

00060     def __getattribute__(self, name):
        """Return the value if there."""
        if name[0] == "_":
            return object.__getattribute__(self, name)
        else:
            def f(*args):
                setattr(self, "_called_method", (name, args))
                if self._fake_response is None:
                    # no hurt in returning a deferred, it may be needed
                    return defer.Deferred()
                methname, response = self._fake_response
                assert methname == name
                return response
            return f


00076 class FakeSDTool(CallLoguer):
    """Fake real SyncDaemonTool."""
    def __init__(self, _):
        CallLoguer.__init__(self)


00082 class FakeSyncDaemon(CallLoguer):
    """Fake Magicicada's SyncDaemon."""


00086 class SafeTests(TwistedTestCase):
    """Safe tests not going outside the testing box."""

00089     def setUp(self):
        """Set up."""
        dbusiface.SessionBus = FakeSessionBus
        dbusiface.SyncDaemonTool = FakeSDTool
        self.fsd = FakeSyncDaemon()
        self.dbus = dbusiface.DBusInterface(self.fsd)

00096     def check_sdt_called(self, name):
        """Check that the SyncDaemonTool method was called."""
        self.assertEqual(self.dbus.sync_daemon_tool._called_method[0], name)

00100     def get_msd_called(self, name):
        """Get the args from the called Magicicada's SyncDaemon method."""
        called_method, called_args = self.fsd._called_method
        self.assertEqual(called_method, name)
        return called_args

00106     def fake_sdt_response(self, method_name, response):
        """Fakes SDT answer in deferred mode."""
        self.dbus.sync_daemon_tool._fake_response = (method_name,
                                                     defer.succeed(response))

00111 class TestSignalHooking(SafeTests):
    """Signal hooking tests.

    We can not check if the methods are really called, because DBus holds the
    method object itself, so no chance in monkeypatching.
    """
00117     def _get_hooked(self, iface, signal):
        """Return the hooked method if any."""
        if iface is None:
            interface = None
        else:
            interface = 'com.ubuntuone.SyncDaemon.' + iface
        return self.dbus._bus._callbacks.get((interface, signal))

00125     def test_hook_unhook(self):
        """Test the hooked signals are unhooked."""
        self.dbus.shutdown()
        self.assertEqual(self.dbus._bus._callbacks, {})

00130     def test_status_changed(self):
        """Test status changed callback."""
        self.assertEqual(self._get_hooked('Status', 'StatusChanged'),
                         self.dbus._on_status_changed)

00135     def test_content_queue_changed(self):
        """Test content queue changed callback."""
        self.assertEqual(self._get_hooked('Status', 'ContentQueueChanged'),
                         self.dbus._on_content_queue_changed)

00140     def test_name_owner_changed(self):
        """Test name owner changed callback."""
        self.assertEqual(self._get_hooked(None, 'NameOwnerChanged'),
                         self.dbus._on_name_owner_changed)

00145     def test_folder_created_changed(self):
        """Test folder created changed callback."""
        self.assertEqual(self._get_hooked('Folders', 'FolderCreated'),
                         self.dbus._on_folder_created)

00150     def test_folder_deleted_changed(self):
        """Test folder deleted changed callback."""
        self.assertEqual(self._get_hooked('Folders', 'FolderDeleted'),
                         self.dbus._on_folder_deleted)

00155     def test_share_created(self):
        """Test share created callback."""
        self.assertEqual(self._get_hooked('Shares', 'ShareCreated'),
                         self.dbus._on_share_created)

00160     def test_share_deleted(self):
        """Test share deleted callback."""
        self.assertEqual(self._get_hooked('Shares', 'ShareDeleted'),
                         self.dbus._on_share_deleted)

00165     def test_share_changed(self):
        """Test share changed callback."""
        self.assertEqual(self._get_hooked('Shares', 'ShareChanged'),
                         self.dbus._on_share_changed)


00171 class TestSimpleCalls(SafeTests):
    """Tests for some simple calls."""

    @defer.inlineCallbacks
00175     def test_is_sd_started_yes(self):
        """Test is SD started, yes."""
        self.dbus._bus.fake_name_owner = 'some owner'
        resp = yield self.dbus.is_sd_started()
        self.assertTrue(resp)

    @defer.inlineCallbacks
00182     def test_is_sd_started_no(self):
        """Test is SD started, no."""
        self.dbus._bus.fake_name_owner = dbus.exceptions.DBusException(
                            name='org.freedesktop.DBus.Error.NameHasNoOwner')
        resp = yield self.dbus.is_sd_started()
        self.assertFalse(resp)


00190 class TestDataProcessingStatus(SafeTests):
    """Processes Status before sending it to SyncDaemon."""

    @defer.inlineCallbacks
00194     def test_get_status(self):
        """Test getting status."""
        d = dict(name='n', description='d', is_error='', is_connected='True',
                 is_online='', queues='q', connection='c')
        self.fake_sdt_response('get_status', d)
        args = yield self.dbus.get_status()
        name, descrip, error, connected, online, queues, connection = args
        self.assertEqual(name, 'n')
        self.assertEqual(descrip, 'd')
        self.assertEqual(error, False)
        self.assertEqual(connected, True)
        self.assertEqual(online, False)
        self.assertEqual(queues, 'q')
        self.assertEqual(connection, 'c')

00209     def test_status_changed(self):
        """Test status changed callback."""
        d = dict(name='name', description='description', is_error='',
                 is_connected='True', is_online='', queues='queues',
                 connection='connection')
        self.dbus._on_status_changed(d)
        args = self.get_msd_called("on_sd_status_changed")
        name, descrip, error, connected, online, queues, connection = args
        self.assertEqual(name, 'name')
        self.assertEqual(descrip, 'description')
        self.assertEqual(error, False)
        self.assertEqual(connected, True)
        self.assertEqual(online, False)
        self.assertEqual(queues, 'queues')
        self.assertEqual(connection, 'connection')


00226 class TestDataProcessingNameOwner(SafeTests):
    """Processes Name Owner data before sending it to SyncDaemon."""

00229     def test_name_owner_changed_no_syncdaemon(self):
        """Test name owner changed callback."""
        self.dbus._on_name_owner_changed("foo", "bar", "baz")
        self.get_msd_called(None)

00234     def test_name_owner_changed_yes_syncdaemon_TF(self):
        """Test name owner changed callback."""
        self.dbus._on_name_owner_changed("com.ubuntuone.SyncDaemon", "T", "")
        rcv, = self.get_msd_called("on_sd_name_owner_changed")
        self.assertEqual(rcv, False)

00240     def test_name_owner_changed_yes_syncdaemon_FT(self):
        """Test name owner changed callback."""
        self.dbus._on_name_owner_changed("com.ubuntuone.SyncDaemon", "", "T")
        rcv, = self.get_msd_called("on_sd_name_owner_changed")
        self.assertEqual(rcv, True)


00247 class TestDataProcessingCQ(SafeTests):
    """Processes CQ data before sending it to SyncDaemon."""

    @defer.inlineCallbacks
00251     def test_nodata(self):
        """Test with no data in the queue."""
        self.fake_sdt_response('waiting_content', [])
        rcv = yield self.dbus.get_content_queue()
        self.assertEqual(len(rcv), 0)

    @defer.inlineCallbacks
00258     def test_one_item(self):
        """Test with one item in the queue."""
        c = dict(operation='oper', path='path', share='share', node='node')
        self.fake_sdt_response('waiting_content', [c])
        rcv = yield self.dbus.get_content_queue()
        self.assertEqual(len(rcv), 1)
        data = rcv[0]
        self.assertEqual(data.operation, 'oper')
        self.assertEqual(data.path, 'path')
        self.assertEqual(data.share, 'share')
        self.assertEqual(data.node, 'node')

    @defer.inlineCallbacks
00271     def test_two_items(self):
        """Test with two items in the queue."""
        c = dict(operation='oper1', path='path1', share='share1', node='node1')
        d = dict(operation='oper2', path='path2', share='share2', node='node2')
        self.fake_sdt_response('waiting_content', [c, d])
        rcv = yield self.dbus.get_content_queue()
        self.assertEqual(len(rcv), 2)
        data = rcv[0]
        self.assertEqual(data.operation, 'oper1')
        self.assertEqual(data.path, 'path1')
        self.assertEqual(data.share, 'share1')
        self.assertEqual(data.node, 'node1')
        data = rcv[1]
        self.assertEqual(data.operation, 'oper2')
        self.assertEqual(data.path, 'path2')
        self.assertEqual(data.share, 'share2')
        self.assertEqual(data.node, 'node2')


00290 class TestDataProcessingMQ(SafeTests):
    """Processes MQ data before sending it to SyncDaemon."""

    @defer.inlineCallbacks
00294     def test_nodata(self):
        """Test with no data in the queue."""
        self.fake_sdt_response('waiting_metadata', [])
        rcv = yield self.dbus.get_meta_queue()
        self.assertEqual(len(rcv), 0)

    @defer.inlineCallbacks
00301     def test_one_item(self):
        """Test with one item in the queue."""
        self.fake_sdt_response('waiting_metadata', ['ListShares'])
        rcv = yield self.dbus.get_meta_queue()
        self.assertEqual(len(rcv), 1)
        data = rcv[0]
        self.assertEqual(data.operation, 'ListShares')
        self.assertEqual(data.path, None)
        self.assertEqual(data.share, None)
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00313     def test_two_items(self):
        """Test ContentQueueChanged signal with two items in the queue."""
        cmd1 = 'MakeDir(share_id=a, parent_id=b, name=c, marker=d)'
        cmd2 = 'GetPublicFiles'
        self.fake_sdt_response('waiting_metadata', [cmd1, cmd2])
        rcv = yield self.dbus.get_meta_queue()
        self.assertEqual(len(rcv), 2)
        data = rcv[0]
        self.assertEqual(data.operation, 'MakeDir')
        self.assertEqual(data.path, '/?.../c')
        self.assertEqual(data.share, 'a')
        self.assertEqual(data.node, None)
        data = rcv[1]
        self.assertEqual(data.operation, 'GetPublicFiles')
        self.assertEqual(data.path, None)
        self.assertEqual(data.share, None)
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00332     def test_GetPublicFiles(self):
        """Test meta with GetPublicFiles."""
        cmd = 'GetPublicFiles'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'GetPublicFiles')
        self.assertEqual(data.path, None)
        self.assertEqual(data.share, None)
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00344     def test_AccountInquiry(self):
        """Test meta with AccountInquiry."""
        cmd = 'AccountInquiry'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'AccountInquiry')
        self.assertEqual(data.path, None)
        self.assertEqual(data.share, None)
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00356     def test_FreeSpaceInquiry(self):
        """Test meta with FreeSpaceInquiry."""
        cmd = 'FreeSpaceInquiry'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'FreeSpaceInquiry')
        self.assertEqual(data.path, None)
        self.assertEqual(data.share, None)
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00368     def test_ListShares(self):
        """Test meta with ListShares."""
        cmd = 'ListShares'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'ListShares')
        self.assertEqual(data.path, None)
        self.assertEqual(data.share, None)
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00380     def test_ListVolumes(self):
        """Test meta with ListVolumes."""
        cmd = 'ListVolumes'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'ListVolumes')
        self.assertEqual(data.path, None)
        self.assertEqual(data.share, None)
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00392     def test_Query(self):
        """Test meta with Query."""
        cmd = 'Query'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'Query')
        self.assertEqual(data.path, None)
        self.assertEqual(data.share, None)
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00404     def test_ListDir(self):
        """Test meta with ListDir."""
        cmd = 'ListDir(share_id=a, node_id=b, server_hash=c)'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'ListDir')
        self.assertEqual(data.path, '?')
        self.assertEqual(data.share, 'a')
        self.assertEqual(data.node, 'b')

    @defer.inlineCallbacks
00416     def test_MakeDir(self):
        """Test meta with MakeDir."""
        cmd = 'MakeDir(share_id=a, parent_id=b, name=c, marker=d)'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'MakeDir')
        self.assertEqual(data.path, '/?.../c')
        self.assertEqual(data.share, 'a')
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00428     def test_MakeFile(self):
        """Test meta with MakeFile."""
        cmd = 'MakeFile(share_id=a, parent_id=b, name=c, marker=d)'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'MakeFile')
        self.assertEqual(data.path, '/?.../c')
        self.assertEqual(data.share, 'a')
        self.assertEqual(data.node, None)

    @defer.inlineCallbacks
00440     def test_Unlink(self):
        """Test meta with Unlink."""
        cmd = 'Unlink(share_id=a, node_id=b, server_hash=c)'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'Unlink')
        self.assertEqual(data.path, '?')
        self.assertEqual(data.share, 'a')
        self.assertEqual(data.node, 'b')

    @defer.inlineCallbacks
00452     def test_Move(self):
        """Test meta with Move."""
        cmd = 'Move(share_id=a, node_id=b, old_parent_id=c, '\
              'new_parent_id=d, new_name=e)'
        self.fake_sdt_response('waiting_metadata', [cmd])
        rcv = yield self.dbus.get_meta_queue()
        data = rcv[0]
        self.assertEqual(data.operation, 'Move')
        self.assertEqual(data.path, '/?.../? -> /?.../e')
        self.assertEqual(data.share, 'a')
        self.assertEqual(data.node, 'b')


00465 class TestDataProcessingFolders(SafeTests):
    """Processes Folders data before sending it to SyncDaemon."""

    @defer.inlineCallbacks
00469     def test_nodata(self):
        """Test get folders with no data."""
        self.fake_sdt_response('get_folders', [])
        rcv = yield self.dbus.get_folders()
        self.assertEqual(len(rcv), 0)

    @defer.inlineCallbacks
00476     def test_one(self):
        """Test get folders with one."""
        d = dict(node_id='nid', path=u'pth', subscribed='True',
                 suggested_path=u'sgp', type='UDF', volume_id='vid')
        self.fake_sdt_response('get_folders', [d])
        rcv = yield self.dbus.get_folders()
        self.assertEqual(len(rcv), 1)
        folder = rcv[0]
        self.assertEqual(folder.node, 'nid')
        self.assertEqual(folder.path, u'pth')
        self.assertEqual(folder.suggested_path, u'sgp')
        self.assertEqual(folder.subscribed, True)
        self.assertEqual(folder.volume, 'vid')

    @defer.inlineCallbacks
00491     def test_getting_info_two(self):
        """When changed, update info, got two."""
        d1 = dict(node_id='nid1', path=u'pth1', subscribed='True',
                  suggested_path=u'sgp1', type='UDF', volume_id='vid1')
        d2 = dict(node_id='nid2', path=u'pth2', subscribed='',
                  suggested_path=u'sgp2', type='UDF', volume_id='vid2')
        self.fake_sdt_response('get_folders', [d1, d2])
        rcv = yield self.dbus.get_folders()
        self.assertEqual(len(rcv), 2)
        folder = rcv[0]
        self.assertEqual(folder.node, 'nid1')
        self.assertEqual(folder.path, u'pth1')
        self.assertEqual(folder.suggested_path, u'sgp1')
        self.assertEqual(folder.subscribed, True)
        self.assertEqual(folder.volume, 'vid1')
        folder = rcv[1]
        self.assertEqual(folder.node, 'nid2')
        self.assertEqual(folder.path, u'pth2')
        self.assertEqual(folder.suggested_path, u'sgp2')
        self.assertEqual(folder.subscribed, False)
        self.assertEqual(folder.volume, 'vid2')

00513     def test_folders_changed_from_created(self):
        """Test folders changed callback from created."""
        self.dbus._on_folder_created(None)
        self.get_msd_called("on_sd_folders_changed")

00518     def test_folders_changed_from_deleted(self):
        """Test folders changed callback from deleted."""
        self.dbus._on_folder_deleted(None)
        self.get_msd_called("on_sd_folders_changed")



00525 class TestDataProcessingShares(SafeTests):
    """Processes Shares data before sending it to SyncDaemon."""

    @defer.inlineCallbacks
00529     def test_sharestome_nodata(self):
        """Test get shares to me with no data."""
        self.fake_sdt_response('get_shares', [])
        rcv = yield self.dbus.get_shares_to_me()
        self.assertEqual(len(rcv), 0)

    @defer.inlineCallbacks
00536     def test_sharestoothers_nodata(self):
        """Test get shares to others with no data."""
        self.fake_sdt_response('list_shared', [])
        rcv = yield self.dbus.get_shares_to_others()
        self.assertEqual(len(rcv), 0)

    @defer.inlineCallbacks
00543     def test_sharestome_one(self):
        """Test get shares to me with one."""
        d = dict(accepted=u'True', access_level=u'View', free_bytes=u'123456',
                 name=u'foobar', node_id=u'node', other_username=u'johndoe',
                 other_visible_name=u'John Doe', path=u'path',
                 volume_id=u'vol', type=u'Share')
        self.fake_sdt_response('get_shares', [d])
        rcv = yield self.dbus.get_shares_to_me()
        self.assertEqual(len(rcv), 1)
        share = rcv[0]
        self.assertEqual(share.accepted, True)
        self.assertEqual(share.access_level, 'View')
        self.assertEqual(share.free_bytes, 123456)
        self.assertEqual(share.name, 'foobar')
        self.assertEqual(share.node_id, 'node')
        self.assertEqual(share.other_username, 'johndoe')
        self.assertEqual(share.other_visible_name, 'John Doe')
        self.assertEqual(share.path, 'path')
        self.assertEqual(share.volume_id, 'vol')

    @defer.inlineCallbacks
00564     def test_sharestoother_one(self):
        """Test get shares to other with one."""
        d = dict(accepted=u'True', access_level=u'View', free_bytes=u'123456',
                 name=u'foobar', node_id=u'node', other_username=u'johndoe',
                 other_visible_name=u'John Doe', path=u'path',
                 volume_id=u'vol', type=u'Shared')
        self.fake_sdt_response('list_shared', [d])
        rcv = yield self.dbus.get_shares_to_others()
        self.assertEqual(len(rcv), 1)
        share = rcv[0]
        self.assertEqual(share.accepted, True)
        self.assertEqual(share.access_level, 'View')
        self.assertEqual(share.free_bytes, 123456)
        self.assertEqual(share.name, 'foobar')
        self.assertEqual(share.node_id, 'node')
        self.assertEqual(share.other_username, 'johndoe')
        self.assertEqual(share.other_visible_name, 'John Doe')
        self.assertEqual(share.path, 'path')
        self.assertEqual(share.volume_id, 'vol')

    @defer.inlineCallbacks
00585     def test_sharestoother_two(self):
        """Test get shares to other with two."""
        d1 = dict(accepted=u'True', access_level=u'View', free_bytes=u'123456',
                  name=u'foobar', node_id=u'node', other_username=u'johndoe',
                  other_visible_name=u'John Doe', path=u'path',
                  volume_id=u'vol', type=u'Shared')
        d2 = dict(accepted=u'', access_level=u'Modify', free_bytes=u'789',
                  name=u'rulo', node_id=u'node', other_username=u'nn',
                  other_visible_name=u'Ene Ene', path=u'path',
                  volume_id=u'vol', type=u'Shared')
        self.fake_sdt_response('list_shared', [d1, d2])
        rcv = yield self.dbus.get_shares_to_others()
        self.assertEqual(len(rcv), 2)
        share = rcv[0]
        self.assertEqual(share.accepted, True)
        self.assertEqual(share.access_level, 'View')
        self.assertEqual(share.free_bytes, 123456)
        self.assertEqual(share.name, 'foobar')
        self.assertEqual(share.node_id, 'node')
        self.assertEqual(share.other_username, 'johndoe')
        self.assertEqual(share.other_visible_name, 'John Doe')
        self.assertEqual(share.path, 'path')
        self.assertEqual(share.volume_id, 'vol')
        share = rcv[1]
        self.assertEqual(share.accepted, False)
        self.assertEqual(share.access_level, 'Modify')
        self.assertEqual(share.free_bytes, 789)
        self.assertEqual(share.name, 'rulo')
        self.assertEqual(share.node_id, 'node')
        self.assertEqual(share.other_username, 'nn')
        self.assertEqual(share.other_visible_name, 'Ene Ene')
        self.assertEqual(share.path, 'path')
        self.assertEqual(share.volume_id, 'vol')

00619     def test_shares_changed_from_created(self):
        """Test shares changed callback from created."""
        self.dbus._on_share_created(None)
        self.get_msd_called("on_sd_shares_changed")

00624     def test_shares_changed_from_deleted(self):
        """Test shares changed callback from deleted."""
        self.dbus._on_share_deleted(None)
        self.get_msd_called("on_sd_shares_changed")

00629     def test_shares_changed_from_changed(self):
        """Test shares changed callback from changed."""
        self.dbus._on_share_changed(None)
        self.get_msd_called("on_sd_shares_changed")

    @defer.inlineCallbacks
00635     def test_with_no_free_bytes(self):
        """Test get shares to me with no free bytes."""
        d = dict(accepted=u'True', access_level=u'View', free_bytes=u'',
                 name=u'foobar', node_id=u'node', other_username=u'johndoe',
                 other_visible_name=u'John Doe', path=u'path',
                 volume_id=u'vol', type=u'Share')
        self.fake_sdt_response('get_shares', [d])
        rcv = yield self.dbus.get_shares_to_me()
        self.assertEqual(len(rcv), 1)
        share = rcv[0]
        self.assertEqual(share.accepted, True)
        self.assertEqual(share.access_level, 'View')
        self.assertEqual(share.free_bytes, None)
        self.assertEqual(share.name, 'foobar')
        self.assertEqual(share.node_id, 'node')
        self.assertEqual(share.other_username, 'johndoe')
        self.assertEqual(share.other_visible_name, 'John Doe')
        self.assertEqual(share.path, 'path')
        self.assertEqual(share.volume_id, 'vol')


00656 class TestToolActions(SafeTests):
    """Actions against SD.tools.

    Here we test only the actions, not callbacks, as they're tested before
    in what they return.
    """

00663     def test_start(self):
        """Test call to start."""
        self.dbus.start()
        self.check_sdt_called("start")

00668     def test_quit(self):
        """Test call to quit."""
        self.dbus.quit()
        self.check_sdt_called("quit")

00673     def test_connect(self):
        """Test call to connect."""
        self.dbus.connect()
        self.check_sdt_called("connect")

00678     def test_disconnect(self):
        """Test call to disconnect."""
        self.dbus.disconnect()
        self.check_sdt_called("disconnect")


00684 class TestLogs(SafeTests):
    """Test logging."""

00687     def setUp(self):
        """Set up."""
        self.handler = MementoHandler()
        logging.getLogger('magicicada.dbusiface').addHandler(self.handler)
        self.handler.setLevel(logging.DEBUG)
        SafeTests.setUp(self)

00694     def test_instancing(self):
        """Just logged SD instancing."""
        self.assertTrue(self.handler.check_inf("DBus interface starting"))

00698     def test_shutdown(self):
        """Log when SD shutdowns."""
        self.dbus.shutdown()
        self.assertTrue(self.handler.check_inf("DBus interface going down"))

00703     def test_waiting_content(self):
        """Test call to waiting content."""
        self.dbus.get_content_queue()
        self.assertTrue(self.handler.check_inf("Getting content queue"))

00708     def test_waiting_meta(self):
        """Test call to waiting meta."""
        self.dbus.get_meta_queue()
        self.assertTrue(self.handler.check_inf("Getting meta queue"))

00713     def test_get_status(self):
        """Test call to status."""
        self.dbus.get_status()
        self.assertTrue(self.handler.check_inf("Getting status"))

00718     def test_get_folders(self):
        """Test call to folders."""
        self.dbus.get_folders()
        self.assertTrue(self.handler.check_inf("Getting folders"))

00723     def test_get_shares_to_me(self):
        """Test call to shares to me."""
        self.dbus.get_shares_to_me()
        self.assertTrue(self.handler.check_inf("Getting shares to me"))

00728     def test_get_shares_to_other(self):
        """Test call to shares to others."""
        self.dbus.get_shares_to_others()
        self.assertTrue(self.handler.check_inf("Getting shares to others"))

00733     def test_is_sd_started(self):
        """Test call to is_sd_started."""
        self.dbus.is_sd_started()
        self.assertTrue(self.handler.check_inf(
                        "Checking if SD is started: True"))

00739     def test_start(self):
        """Test call to start."""
        self.dbus.start()
        self.assertTrue(self.handler.check_inf("Calling start"))

00744     def test_quit(self):
        """Test call to quit."""
        self.dbus.quit()
        self.assertTrue(self.handler.check_inf("Calling quit"))

00749     def test_connect(self):
        """Test call to connect."""
        self.dbus.connect()
        self.assertTrue(self.handler.check_inf("Calling connect"))

00754     def test_disconnect(self):
        """Test call to disconnect."""
        self.dbus.disconnect()
        self.assertTrue(self.handler.check_inf("Calling disconnect"))

00759     def test_status_changed(self):
        """Test status changed callback."""
        d = dict(name='name', description='description', is_error='',
                 is_connected='True', is_online='', queues='queues',
                 connection='connection')
        self.dbus._on_status_changed(d)
        self.assertTrue(self.handler.check_inf("Received Status changed"))
        self.assertTrue(self.handler.check_dbg("Status changed data: %r" % d))

00768     def test_content_queue_changed(self):
        """Test content queue changed callback."""
        self.dbus._on_content_queue_changed("foo")
        self.assertTrue(self.handler.check_inf(
                        "Received Content Queue changed"))

00774     def test_name_owner_changed_other(self):
        """Test name owner changed callback, no SD."""
        self.dbus._on_name_owner_changed("other", "", "T")
        self.assertFalse(self.handler.check_inf("Received Name Owner changed"))

00779     def test_name_owner_changed_syncdaemon(self):
        """Test name owner changed callback, SD value ok."""
        self.dbus._on_name_owner_changed("com.ubuntuone.SyncDaemon", "", "T")
        self.assertTrue(self.handler.check_inf("Received Name Owner changed"))
        self.assertTrue(self.handler.check_dbg("Name Owner data: u'' u'T'"))

00785     def test_name_owner_changed_yes_syncdaemon_TF(self):
        """Test name owner changed callback, SD value bad."""
        self.dbus._on_name_owner_changed("com.ubuntuone.SyncDaemon", "F", "T")
        self.assertTrue(self.handler.check_inf("Received Name Owner changed"))
        self.assertTrue(self.handler.check_dbg("Name Owner data: u'F' u'T'"))
        self.assertTrue(self.handler.check("ERROR",
                        "Name Owner invalid data: Same bool in old and new!"))

00793     def test_folder_created_changed(self):
        """Test folder created changed callback."""
        self.dbus._on_folder_created("foo")
        self.assertTrue(self.handler.check_inf("Received Folder created"))

00798     def test_folder_deleted_changed(self):
        """Test folder deleted changed callback."""
        self.dbus._on_folder_deleted("foo")
        self.assertTrue(self.handler.check_inf("Received Folder deleted"))

00803     def test_share_created(self):
        """Test share created callback."""
        self.dbus._on_share_created("foo")
        self.assertTrue(self.handler.check_inf("Received Share created"))

00808     def test_share_deleted(self):
        """Test share deleted callback."""
        self.dbus._on_share_deleted("foo")
        self.assertTrue(self.handler.check_inf("Received Share deleted"))

00813     def test_share__changed(self):
        """Test share changed callback."""
        self.dbus._on_share_changed("foo")
        self.assertTrue(self.handler.check_inf("Received Share changed"))

    @defer.inlineCallbacks
00819     def test_content_queue_processing(self):
        """Test with one item in the queue."""
        c = dict(operation='oper', path='path', share='share', node='node')
        self.fake_sdt_response('waiting_content', [c])
        yield self.dbus.get_content_queue()
        self.assertTrue(self.handler.check_inf(
                        "Processing Content Queue items (1)"))
        self.assertTrue(self.handler.check_dbg(
                        "    Content Queue data: %s" % c))

    @defer.inlineCallbacks
00830     def test_meta_queue_processing(self):
        """Test with one item in the queue."""
        self.fake_sdt_response('waiting_metadata', ['ListShares'])
        yield self.dbus.get_meta_queue()
        self.assertTrue(self.handler.check_inf(
                        "Processing Meta Queue items (1)"))
        self.assertTrue(self.handler.check_dbg(
                        "    Meta Queue data: u'ListShares'"))

    @defer.inlineCallbacks
00840     def test_folders_processing(self):
        """Test get folders with one."""
        d = dict(node_id='nid', path=u'pth', subscribed='True',
                 suggested_path=u'sgp', type='UDF', volume_id='vid')
        self.fake_sdt_response('get_folders', [d])
        yield self.dbus.get_folders()
        self.assertTrue(self.handler.check_inf("Processing Folders items (1)"))
        self.assertTrue(self.handler.check_dbg("    Folders data: %r" % d))

    @defer.inlineCallbacks
00850     def test_sharestome_processing(self):
        """Test get shares to me with one."""
        d = dict(accepted=u'True', access_level=u'View', free_bytes=u'123456',
                 name=u'foobar', node_id=u'node', other_username=u'johndoe',
                 other_visible_name=u'John Doe', path=u'path',
                 volume_id=u'vol', type=u'Share')
        self.fake_sdt_response('get_shares', [d])
        yield self.dbus.get_shares_to_me()
        self.assertTrue(self.handler.check_inf(
                        "Processing Shares To Me items (1)"))
        self.assertTrue(self.handler.check_dbg("    Share data: %r" % d))

    @defer.inlineCallbacks
00863     def test_sharestoothers_processing(self):
        """Test get shares to others with one."""
        d = dict(accepted=u'True', access_level=u'View', free_bytes=u'123456',
                 name=u'foobar', node_id=u'node', other_username=u'johndoe',
                 other_visible_name=u'John Doe', path=u'path',
                 volume_id=u'vol', type=u'Shared')
        self.fake_sdt_response('list_shared', [d])
        yield self.dbus.get_shares_to_others()
        self.assertTrue(self.handler.check_inf(
                        "Processing Shares To Others items (1)"))
        self.assertTrue(self.handler.check_dbg("    Share data: %r" % d))


00876 class RetryDecoratorTests(TwistedTestCase):
    """Test the retry decorator."""

00879     class Helper(object):
        """Fails some times, finally succeeds."""
        def __init__(self, limit, excep=None):
            self.cant = 0
            self.limit = limit
            if excep is None:
                self.excep = dbus.exceptions.DBusException(
                                    name='org.freedesktop.DBus.Error.NoReply')
            else:
                self.excep = excep

00890         def __call__(self):
            """Called."""
            self.cant += 1
            if self.cant < self.limit:
                return defer.fail(self.excep)
            else:
                return defer.succeed(True)

00898     def test_retryexcep_noexcep(self):
        """Test _is_retry_exception with no error."""
        self.assertFalse(dbusiface._is_retry_exception("foo"))

    def test_retryexcep_stdexcep(self):
        """Test _is_retry_exception with a standard exception."""
        self.assertFalse(dbusiface._is_retry_exception(NameError("foo")))

    def test_retryexcep_dbusnoretry(self):
        """Test _is_retry_exception with DBus exception, but not retry."""
        err = dbus.exceptions.DBusException(name='org.freedesktop.DBus.Other')
        self.assertFalse(dbusiface._is_retry_exception(err))

    def test_retryexcep_dbusretry(self):
        """Test _is_retry_exception with DBus exception, retry."""
        err = dbus.exceptions.DBusException(
                                    name='org.freedesktop.DBus.Error.NoReply')
        self.assertTrue(dbusiface._is_retry_exception(err))

    def get_decorated_func(self, func):
        """Executes the test calling the received function."""

        @dbusiface.retryable
        def f():
            """Test func."""
            d = func()
            return d

        return f

    def test_all_ok(self):
        """All ok."""
        f = self.get_decorated_func(lambda: defer.succeed(True))
        d = f()
        return d

    def test_one_fail(self):
        """One fail."""
        deferred = defer.Deferred()
        helper = self.Helper(2)
        d = self.get_decorated_func(helper)()

        def check(_):
            """Check called quantity."""
            self.assertEqual(helper.cant, 2)
            deferred.callback(True)
        d.addCallbacks(check,
                       lambda _: deferred.errback(Exception()))
        return deferred

    def test_two_fails(self):
        """Two fails."""
        deferred = defer.Deferred()
        helper = self.Helper(3)
        d = self.get_decorated_func(helper)()

        def check(_):
            """Check called quantity."""
            self.assertEqual(helper.cant, 3)
            deferred.callback(True)
        d.addCallbacks(check,
                       lambda _: deferred.errback(Exception()))
        return deferred

    def test_too_many_fails(self):
        """Check that retryal is not forever."""
        deferred = defer.Deferred()
        helper = self.Helper(12)
        d = self.get_decorated_func(helper)()

        d.addCallbacks(lambda _: deferred.errback(Exception()),
                       lambda _: deferred.callback(True))
        return deferred

    def test_other_exception(self):
        """Less fails than limit, but not retrying exception."""
        deferred = defer.Deferred()
        helper = self.Helper(2, excep=NameError("foo"))
        d = self.get_decorated_func(helper)()

        d.addCallbacks(lambda _: deferred.errback(Exception()),
                       lambda _: deferred.callback(True))
        return deferred

Generated by  Doxygen 1.6.0   Back to index