Logo Search packages:      
Sourcecode: magicicada version File versions  Download package

dbusiface.py

# dbusiface.py
#
# Author: Facundo Batista <facundo@taniquetil.com.ar>
#
# Copyright 2010 Chicharreros
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""The DBus Interface."""

import collections
import logging
import re

import dbus
from dbus import SessionBus
from dbus.mainloop.glib import DBusGMainLoop
from twisted.internet import defer

from ubuntuone.syncdaemon.tools import SyncDaemonTool

# log!
logger = logging.getLogger('magicicada.dbusiface')

QueueData = collections.namedtuple('QueueData', 'operation path share node')
FolderData = collections.namedtuple('FolderData',
                                'node path suggested_path subscribed volume')
ShareData = collections.namedtuple('ShareData', 'accepted access_level '
                                   'free_bytes name node_id other_username '
                                   'other_visible_name path volume_id')

# regular expressions for parsing MetaQueue data
RE_OP_LISTDIR = re.compile("(ListDir)\(share_id=(.*?), node_id=(.*?), .*")
RE_OP_UNLINK = re.compile("(Unlink)\(share_id=(.*?), node_id=(.*?), .*")
RE_OP_MAKEFILE = re.compile(
                "(MakeFile)\(share_id=(.*?), parent_id=(.*?), name=(.*?), .*")
RE_OP_MAKEDIR = re.compile(
                "(MakeDir)\(share_id=(.*?), parent_id=(.*?), name=(.*?), .*")
RE_OP_MOVE = re.compile(
                "(Move)\(share_id=(.*?), node_id=(.*?), old_parent_id=(.*?), "
                "new_parent_id=(.*?), new_name=(.*?)\)")


def _is_retry_exception(err):
    """Check if the exception is a retry one."""
    if isinstance(err, dbus.exceptions.DBusException):
        if err.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply':
            return True
    return False

def retryable(func):
    """Call the function until its deferred succeed (max 5 times)."""

    @defer.inlineCallbacks
    def f(*a, **k):
        """Built func."""
        opportunities = 10
        while opportunities:
            try:
                res = yield func(*a, **k)
            except Exception, err:
                opportunities -= 1
                if opportunities == 0 or not _is_retry_exception(err):
                    raise
            else:
                break
        defer.returnValue(res)

    return f


00082 class DBusInterface(object):
    """The DBus Interface to Ubuntu One's SyncDaemon."""

    def __init__(self, msd):
        # magicicada's syncdaemon
        self.msd = msd
        logger.info("DBus interface starting")

        # set up dbus and related stuff
        loop = DBusGMainLoop(set_as_default=True)
        self._bus = bus = SessionBus(mainloop=loop)
        self.sync_daemon_tool = SyncDaemonTool(bus)

        # hook up for signals and store info for the shutdown
        _signals = [
            (self._on_status_changed, 'Status', 'StatusChanged'),
            (self._on_content_queue_changed, 'Status', 'ContentQueueChanged'),
            (self._on_name_owner_changed, None, 'NameOwnerChanged'),
            (self._on_folder_created, 'Folders', 'FolderCreated'),
            (self._on_folder_deleted, 'Folders', 'FolderDeleted'),
            (self._on_share_created, 'Shares', 'ShareCreated'),
            (self._on_share_deleted, 'Shares', 'ShareDeleted'),
            (self._on_share_changed, 'Shares', 'ShareChanged'),
        ]
        self._dbus_matches = []
        for method, dbus_lastname, signal_name in _signals:
            if dbus_lastname is None:
                dbus_interface = None
            else:
                dbus_interface = 'com.ubuntuone.SyncDaemon.' + dbus_lastname
            match = bus.add_signal_receiver(method,
                                            dbus_interface=dbus_interface,
                                            signal_name=signal_name)
            self._dbus_matches.append((match, dbus_interface, signal_name))


00118     def shutdown(self):
        """Shut down the SyncDaemon."""
        logger.info("DBus interface going down")

        # remove the signals from DBus
        remove = self._bus.remove_signal_receiver
        for match, dbus_interface, signal in self._dbus_matches:
            remove(match, dbus_interface=dbus_interface, signal_name=signal)

00127     def _process_status(self, state):
        """Transform status information."""
        name = state['name']
        description = state['description']
        is_error = bool(state['is_error'])
        is_connected = bool(state['is_connected'])
        is_online = bool(state['is_online'])
        queues = state['queues']
        connection = state['connection']
        return (name, description, is_error, is_connected,
                is_online, queues, connection)

    @retryable
00140     def get_status(self):
        """Get SD status."""
        logger.info("Getting status")
        d = self.sync_daemon_tool.get_status()
        d.addCallback(self._process_status)
        return d

00147     def _on_status_changed(self, state):
        """Call the SD callback."""
        logger.info("Received Status changed")
        logger.debug("Status changed data: %r", state)
        data = self._process_status(state)
        self.msd.on_sd_status_changed(*data)

00154     def _on_content_queue_changed(self, _):
        """Call the SD callback."""
        logger.info("Received Content Queue changed")
        self.msd.on_sd_content_queue_changed()

00159     def _on_name_owner_changed(self, name, oldowner, newowner):
        """Receive the NameOwnerChanged signal from DBus."""
        if name != 'com.ubuntuone.SyncDaemon':
            return

        logger.info("Received Name Owner changed")
        logger.debug("Name Owner data: %r %r", oldowner, newowner)
        old = bool(oldowner)
        new = bool(newowner)
        if old == new:
            logger.error("Name Owner invalid data: Same bool in old and new!")
            return
        self.msd.on_sd_name_owner_changed(new)

00173     def _on_folder_created(self, _):
        """Call the SD callback."""
        logger.info("Received Folder created")
        self.msd.on_sd_folders_changed()

00178     def _on_folder_deleted(self, _):
        """Call the SD callback."""
        logger.info("Received Folder deleted")
        self.msd.on_sd_folders_changed()

00183     def _on_share_created(self, _):
        """Call the SD callback."""
        logger.info("Received Share created")
        self.msd.on_sd_shares_changed()

00188     def _on_share_deleted(self, _):
        """Call the SD callback."""
        logger.info("Received Share deleted")
        self.msd.on_sd_shares_changed()

00193     def _on_share_changed(self, _):
        """Call the SD callback."""
        logger.info("Received Share changed")
        self.msd.on_sd_shares_changed()

    @retryable
00199     def get_content_queue(self):
        """Get the content queue from SDT."""
        def process(data):
            """Enhance data format."""
            logger.info("Processing Content Queue items (%d)", len(data))
            all_items = []
            for d in data:
                logger.debug("    Content Queue data: %r", d)
                cq = QueueData(operation=d['operation'], path=d['path'],
                               node=d['node'], share=d['share'])
                all_items.append(cq)
            return all_items

        logger.info("Getting content queue")
        d = self.sync_daemon_tool.waiting_content()
        d.addCallback(process)
        return d

00217     def _parse_mq(self, data):
        """Parse MetaQueue string to extract its data."""
        if data in ('AccountInquiry', 'FreeSpaceInquiry', 'GetPublicFiles',
                    'ListShares', 'ListVolumes', 'Query'):
            return QueueData(operation=data, path=None, node=None, share=None)

        m = RE_OP_LISTDIR.match(data)
        if m:
            op, share, node = m.groups()
            path = '?' # we should get the real path, no API now
            return QueueData(operation=op, path=path, node=node, share=share)

        m = RE_OP_MAKEFILE.match(data)
        if m:
            op, share, parent, name = m.groups()
            path = '/?.../' + name # we should get the real path, no API now
            return QueueData(operation=op, path=path, node=None, share=share)

        m = RE_OP_MAKEDIR.match(data)
        if m:
            op, share, parent, name = m.groups()
            path = '/?.../' + name # we should get the real path, no API now
            return QueueData(operation=op, path=path, node=None, share=share)

        m = RE_OP_UNLINK.match(data)
        if m:
            op, share, node, = m.groups()
            path = '?' # we should get the real path, no API now
            return QueueData(operation=op, path=path, node=node, share=share)

        m = RE_OP_MOVE.match(data)
        if m:
            op, share, node, old_parent, new_parent, new_name, = m.groups()

            # we should get the real info, no API now
            old_path = '/?...'
            old_name = '?'
            new_path = '/?...'
            composed_path = "%s/%s -> %s/%s" % (old_path, old_name,
                                                new_path, new_name)
            return QueueData(operation=op, path=composed_path,
                             node=node, share=share)

        raise ValueError("Not supported MetaQueue data: %r" % data)

    @retryable
00263     def get_meta_queue(self):
        """Get the meta queue from SDT."""
        def process(data):
            """Enhance data format."""
            logger.info("Processing Meta Queue items (%d)", len(data))
            all_items = []
            for d in data:
                logger.debug("    Meta Queue data: %r", d)
                parsed = self._parse_mq(d)
                all_items.append(parsed)
            return all_items

        logger.info("Getting meta queue")
        d = self.sync_daemon_tool.waiting_metadata()
        d.addCallback(process)
        return d

    @retryable
00281     def get_folders(self):
        """Get the folders info from SDT."""
        def process(data):
            """Enhance data format."""
            logger.info("Processing Folders items (%d)", len(data))
            all_items = []
            for d in data:
                logger.debug("    Folders data: %r", d)
                f = FolderData(node=d['node_id'], path=d['path'],
                               suggested_path=d['suggested_path'],
                               volume=d['volume_id'],
                               subscribed=bool(d['subscribed']))
                all_items.append(f)
            return all_items

        logger.info("Getting folders")
        d = self.sync_daemon_tool.get_folders()
        d.addCallback(process)
        return d

00301     def start(self):
        """Start SDT."""
        logger.info("Calling start")
        self.sync_daemon_tool.start()

00306     def quit(self):
        """Stop SDT."""
        logger.info("Calling quit")
        self.sync_daemon_tool.quit()

00311     def connect(self):
        """Connect SDT."""
        logger.info("Calling connect")
        self.sync_daemon_tool.connect()

00316     def disconnect(self):
        """Disconnect SDT."""
        logger.info("Calling disconnect")
        self.sync_daemon_tool.disconnect()

00321     def is_sd_started(self):
        """Find out if SD is active in the system."""
        try:
            self._bus.get_name_owner('com.ubuntuone.SyncDaemon')
        except dbus.exceptions.DBusException, err:
            if err.get_dbus_name() != \
                                'org.freedesktop.DBus.Error.NameHasNoOwner':
                raise
            started = False
        else:
            started = True
        logger.info("Checking if SD is started: %s", started)
        return started

00335     def _process_share_info(self, data):
        """Process share data."""
        all_items = []
        for d in data:
            logger.debug("    Share data: %r", d)

            # some processing
            dfb = d['free_bytes']
            free_bytes = None if dfb == '' else int(dfb)

            s = ShareData(
                accepted=bool(d['accepted']),
                access_level=d['access_level'],
                free_bytes=free_bytes,
                name=d['name'],
                node_id=d['node_id'],
                other_username=d['other_username'],
                other_visible_name=d['other_visible_name'],
                path=d['path'],
                volume_id=d['volume_id'],
            )
            all_items.append(s)
        return all_items

    @retryable
00360     def get_shares_to_me(self):
        """Get the shares to me ('shares') info from SDT."""
        def process(data):
            """Enhance data format."""
            logger.info("Processing Shares To Me items (%d)", len(data))
            return self._process_share_info(data)

        logger.info("Getting shares to me")
        d = self.sync_daemon_tool.get_shares()
        d.addCallback(process)
        return d

    @retryable
00373     def get_shares_to_others(self):
        """Get the shares to others ('shared') info from SDT."""
        def process(data):
            """Enhance data format."""
            logger.info("Processing Shares To Others items (%d)", len(data))
            return self._process_share_info(data)

        logger.info("Getting shares to others")
        d = self.sync_daemon_tool.list_shared()
        d.addCallback(process)
        return d

Generated by  Doxygen 1.6.0   Back to index