Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/hardware
diff options
context:
space:
mode:
authorSimon Schampijer <simon@schampijer.de>2008-04-26 16:46:38 (GMT)
committer Simon Schampijer <simon@schampijer.de>2008-04-26 16:46:38 (GMT)
commit508eb82f64eb1b8445779391c5959083a15085db (patch)
treed2583262b205edea82730eccde08587514ffa66d /src/hardware
parent4609b417072ac652bdafa4a45b3c52cbd0700b00 (diff)
pylint hardware (part one)
Diffstat (limited to 'src/hardware')
-rw-r--r--src/hardware/hardwaremanager.py6
-rw-r--r--src/hardware/keydialog.py29
-rw-r--r--src/hardware/nmclient.py181
-rw-r--r--src/hardware/nminfo.py115
4 files changed, 195 insertions, 136 deletions
diff --git a/src/hardware/hardwaremanager.py b/src/hardware/hardwaremanager.py
index 29be450..5b9e330 100644
--- a/src/hardware/hardwaremanager.py
+++ b/src/hardware/hardwaremanager.py
@@ -38,9 +38,9 @@ class HardwareManager(object):
proxy = bus.get_object(_HARDWARE_MANAGER_SERVICE,
_HARDWARE_MANAGER_OBJECT_PATH)
self._service = dbus.Interface(proxy, _HARDWARE_MANAGER_INTERFACE)
- except dbus.DBusException, e:
+ except dbus.DBusException, err:
self._service = None
- logging.info('Hardware manager service not found.')
+ logging.info('Hardware manager service not found: %s' % err)
self._mixer = gst.element_factory_make('alsamixer')
self._mixer.set_state(gst.STATE_PAUSED)
@@ -53,7 +53,7 @@ class HardwareManager(object):
def get_volume(self):
if not self._mixer or not self._master:
logging.error('Cannot get the volume')
- return self._convert_volume(0)
+ return None
max_volume = self._master.max_volume
min_volume = self._master.min_volume
diff --git a/src/hardware/keydialog.py b/src/hardware/keydialog.py
index d336ab9..92f6776 100644
--- a/src/hardware/keydialog.py
+++ b/src/hardware/keydialog.py
@@ -19,7 +19,7 @@
import md5
from gettext import gettext as _
-import gobject, gtk
+import gtk
IW_AUTH_ALG_OPEN_SYSTEM = 0x00000001
IW_AUTH_ALG_SHARED_KEY = 0x00000002
@@ -61,7 +61,7 @@ def string_is_ascii(string):
try:
string.encode('ascii')
return True
- except:
+ except UnicodeEncodeError:
return False
def string_to_hex(passphrase):
@@ -88,11 +88,12 @@ class KeyDialog(gtk.Dialog):
self._net = net
self._async_cb = async_cb
self._async_err_cb = async_err_cb
+ self._entry = None
self.set_has_separator(False)
label = gtk.Label("A wireless encryption key is required for\n" \
- " the wireless network '%s'." % net.get_ssid())
+ " the wireless network '%s'." % net.get_ssid())
self.vbox.pack_start(label)
self.add_buttons(gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL,
@@ -265,7 +266,8 @@ class WPAKeyDialog(KeyDialog):
elif len(key) >= 8 and len(key) <= 63:
# passphrase
import commands
- (s, o) = commands.getstatusoutput("/usr/sbin/wpa_passphrase '%s' '%s'" % (ssid, key))
+ command = "/usr/sbin/wpa_passphrase '%s' '%s'" % (ssid, key)
+ (s, o) = commands.getstatusoutput(command)
if s != 0:
raise RuntimeError("Error hashing passphrase: %s" % o)
lines = o.split("\n")
@@ -297,7 +299,8 @@ class WPAKeyDialog(KeyDialog):
def create_security(self):
(we_cipher, key, wpa_ver) = self._get_security()
from nminfo import Security
- return Security.new_from_args(we_cipher, (key, wpa_ver, IW_AUTH_KEY_MGMT_PSK))
+ return Security.new_from_args(we_cipher,
+ (key, wpa_ver, IW_AUTH_KEY_MGMT_PSK))
def _update_response_sensitivity(self, ignored=None):
key = self._entry.get_text()
@@ -315,10 +318,12 @@ class WPAKeyDialog(KeyDialog):
def new_key_dialog(net, async_cb, async_err_cb):
caps = net.get_caps()
- if (caps & NM_802_11_CAP_CIPHER_TKIP or caps & NM_802_11_CAP_CIPHER_CCMP) and \
- (caps & NM_802_11_CAP_PROTO_WPA or caps & NM_802_11_CAP_PROTO_WPA2):
+ if (caps & NM_802_11_CAP_CIPHER_TKIP or caps & NM_802_11_CAP_CIPHER_CCMP) \
+ and (caps & NM_802_11_CAP_PROTO_WPA or \
+ caps & NM_802_11_CAP_PROTO_WPA2):
return WPAKeyDialog(net, async_cb, async_err_cb)
- elif (caps & NM_802_11_CAP_CIPHER_WEP40 or caps & NM_802_11_CAP_CIPHER_WEP104) and \
+ elif (caps & NM_802_11_CAP_CIPHER_WEP40 or \
+ caps & NM_802_11_CAP_CIPHER_WEP104) and \
(caps & NM_802_11_CAP_PROTO_WEP):
return WEPKeyDialog(net, async_cb, async_err_cb)
else:
@@ -331,8 +336,8 @@ class FakeNet(object):
return "olpcwpa"
def get_caps(self):
-# return NM_802_11_CAP_CIPHER_WEP104 | NM_802_11_CAP_PROTO_WEP
- return NM_802_11_CAP_CIPHER_CCMP | NM_802_11_CAP_CIPHER_TKIP | NM_802_11_CAP_PROTO_WPA
+ return NM_802_11_CAP_CIPHER_CCMP | NM_802_11_CAP_CIPHER_TKIP | \
+ NM_802_11_CAP_PROTO_WPA
def response_cb(widget, response_id):
if response_id == gtk.RESPONSE_OK:
@@ -344,8 +349,8 @@ def response_cb(widget, response_id):
if __name__ == "__main__":
- net = FakeNet()
- dialog = new_key_dialog(net, None, None)
+ fake_net = FakeNet()
+ dialog = new_key_dialog(fake_net, None, None)
dialog.connect("response", response_cb)
dialog.run()
diff --git a/src/hardware/nmclient.py b/src/hardware/nmclient.py
index d23a206..ca31a76 100644
--- a/src/hardware/nmclient.py
+++ b/src/hardware/nmclient.py
@@ -16,13 +16,11 @@
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import logging
-import os
import dbus
import dbus.glib
import dbus.decorators
import gobject
-import gtk
from hardware import nminfo
from sugar.graphics import xocolor
@@ -30,17 +28,17 @@ from sugar.graphics import xocolor
IW_AUTH_ALG_OPEN_SYSTEM = 0x00000001
IW_AUTH_ALG_SHARED_KEY = 0x00000002
-NM_DEVICE_STAGE_STRINGS=("Unknown",
- "Prepare",
- "Config",
- "Need Users Key",
- "IP Config",
- "IP Config Get",
- "IP Config Commit",
- "Activated",
- "Failed",
- "Canceled"
-)
+NM_DEVICE_STAGE_STRINGS = ("Unknown",
+ "Prepare",
+ "Config",
+ "Need Users Key",
+ "IP Config",
+ "IP Config Get",
+ "IP Config Commit",
+ "Activated",
+ "Failed",
+ "Canceled"
+ )
NM_SERVICE = 'org.freedesktop.NetworkManager'
NM_IFACE = 'org.freedesktop.NetworkManager'
@@ -114,27 +112,29 @@ class Network(gobject.GObject):
self._strength = props[3]
self._mode = props[6]
self._caps = props[7]
- if self._caps & NM_802_11_CAP_PROTO_WPA or self._caps & NM_802_11_CAP_PROTO_WPA2:
+ if self._caps & NM_802_11_CAP_PROTO_WPA or \
+ self._caps & NM_802_11_CAP_PROTO_WPA2:
if not (self._caps & NM_802_11_CAP_KEY_MGMT_PSK):
# 802.1x is not supported at this time
- logging.debug("Net(%s): ssid '%s' dropping because 802.1x is unsupported" % (self._op,
- self._ssid))
+ logging.debug("Net(%s): ssid '%s' dropping because 802.1x" \
+ "is unsupported" % (self._op, self._ssid))
self._valid = False
self.emit('initialized', self._valid)
return
if self._mode != IW_MODE_INFRA:
# Don't show Ad-Hoc networks; they usually don't DHCP and therefore
- # won't work well here. This also works around the bug where we show
- # our own mesh SSID on the Mesh view when in mesh mode
- logging.debug("Net(%s): ssid '%s' is adhoc; not showing" % (self._op,
- self._ssid))
+ # won't work well here. This also works around the bug where
+ # we show our own mesh SSID on the Mesh view when in mesh mode
+ logging.debug("Net(%s): ssid '%s' is adhoc; not showing" %
+ (self._op, self._ssid))
self._valid = False
self.emit('initialized', self._valid)
return
fav_nets = []
if self._client.nminfo:
- fav_nets = self._client.nminfo.get_networks(nminfo.NETWORK_TYPE_ALLOWED)
+ fav_nets = self._client.nminfo.get_networks(
+ nminfo.NETWORK_TYPE_ALLOWED)
if self._ssid in fav_nets:
self._favorite = True
@@ -197,24 +197,24 @@ class Network(gobject.GObject):
class Device(gobject.GObject):
__gsignals__ = {
- 'initialized': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ([])),
- 'init-failed': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ([])),
- 'ssid-changed': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ([])),
- 'strength-changed': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ([])),
- 'state-changed': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ([])),
+ 'initialized': (gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ([])),
+ 'init-failed': (gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ([])),
+ 'ssid-changed': (gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ([])),
+ 'strength-changed': (gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ([])),
+ 'state-changed': (gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE, ([])),
'activation-stage-changed': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE, ([])),
- 'network-appeared': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
- ([gobject.TYPE_PYOBJECT])),
- 'network-disappeared': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
- ([gobject.TYPE_PYOBJECT]))
+ gobject.TYPE_NONE, ([])),
+ 'network-appeared': (gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE,
+ ([gobject.TYPE_PYOBJECT])),
+ 'network-disappeared': (gobject.SIGNAL_RUN_FIRST,
+ gobject.TYPE_NONE,
+ ([gobject.TYPE_PYOBJECT]))
}
def __init__(self, client, op):
@@ -289,7 +289,8 @@ class Device(gobject.GObject):
for op in net_ops:
net = Network(self._client, op)
self._networks[op] = net
- net.connect('initialized', lambda *args: self._net_initialized_cb(active_op, *args))
+ net.connect('initialized', lambda *args:
+ self._net_initialized_cb(active_op, *args))
def _update_error_cb(self, err):
logging.debug("Device(%s): failed to update. (%s)" % (self._op, err))
@@ -339,16 +340,15 @@ class Device(gobject.GObject):
raise RuntimeError("Only valid for mesh devices")
try:
step = self.dev.getMeshStep(timeout=3)
- except dbus.DBusException, e:
+ except dbus.DBusException:
step = 0
return step
def get_frequency(self):
- freq = 0.0
try:
freq = self.dev.getFrequency(timeout=3)
- except dbus.DBusException, e:
- pass
+ except dbus.DBusException:
+ freq = 0.0
# Hz -> GHz
self._freq = freq / 1000000000.0
return self._freq
@@ -373,7 +373,8 @@ class Device(gobject.GObject):
return
net = Network(self._client, network)
self._networks[network] = net
- net.connect('initialized', lambda *args: self._net_initialized_cb(None, *args))
+ net.connect('initialized', lambda *args:
+ self._net_initialized_cb(None, *args))
def network_disappeared(self, network):
if not self._networks.has_key(network):
@@ -396,8 +397,8 @@ class Device(gobject.GObject):
self._active_network = network
if self._active_network:
- self._active_net_sigid = self._active_network.connect("initialized",
- self._active_net_initialized);
+ self._active_net_sigid = self._active_network.connect(
+ "initialized", self._active_net_initialized)
# don't emit ssid-changed for networks that are not yet valid
if self._valid:
@@ -448,8 +449,10 @@ class Device(gobject.GObject):
if state == DEVICE_STATE_INACTIVE:
self.set_active_network(None)
else:
- self.dev.getActiveNetwork(reply_handler=lambda *args: self._get_active_net_cb(state, *args),
- error_handler=self._get_active_net_error_cb)
+ self.dev.getActiveNetwork(
+ reply_handler=lambda *args:
+ self._get_active_net_cb(state, *args),
+ error_handler=self._get_active_net_error_cb)
def set_activation_stage(self, stage):
if stage == self._act_stage:
@@ -485,16 +488,16 @@ class Device(gobject.GObject):
class NMClient(gobject.GObject):
__gsignals__ = {
'device-added' : (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
+ gobject.TYPE_NONE,
([gobject.TYPE_PYOBJECT])),
'device-activated' : (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
+ gobject.TYPE_NONE,
([gobject.TYPE_PYOBJECT])),
'device-activating': (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
+ gobject.TYPE_NONE,
([gobject.TYPE_PYOBJECT])),
'device-removed' : (gobject.SIGNAL_RUN_FIRST,
- gobject.TYPE_NONE,
+ gobject.TYPE_NONE,
([gobject.TYPE_PYOBJECT]))
}
@@ -503,6 +506,9 @@ class NMClient(gobject.GObject):
self.nminfo = None
self._nm_present = False
+ self._nm_proxy = None
+ self._nm_obj = None
+ self._sig_handlers = None
self._update_timer = 0
self._devices = {}
@@ -533,8 +539,9 @@ class NMClient(gobject.GObject):
logging.debug("Error updating devices (%s)" % err)
def _get_initial_devices(self):
- self._nm_obj.getDevices(reply_handler=self._get_initial_devices_reply_cb, \
- error_handler=self._get_initial_devices_error_cb)
+ self._nm_obj.getDevices(
+ reply_handler=self._get_initial_devices_reply_cb,
+ error_handler=self._get_initial_devices_error_cb)
def _add_device(self, dev_op):
if self._devices.has_key(dev_op):
@@ -576,20 +583,25 @@ class NMClient(gobject.GObject):
'DeviceActivating': self.device_activating_sig_handler,
'DeviceNowActive': self.device_now_active_sig_handler,
'DeviceNoLongerActive': self.device_no_longer_active_sig_handler,
- 'DeviceActivationFailed': self.device_activation_failed_sig_handler,
+ 'DeviceActivationFailed': \
+ self.device_activation_failed_sig_handler,
'DeviceCarrierOn': self.device_carrier_on_sig_handler,
'DeviceCarrierOff': self.device_carrier_off_sig_handler,
- 'DeviceStrengthChanged': self.wireless_device_strength_changed_sig_handler,
- 'WirelessNetworkAppeared': self.wireless_network_appeared_sig_handler,
- 'WirelessNetworkDisappeared': self.wireless_network_disappeared_sig_handler,
- 'WirelessNetworkStrengthChanged': self.wireless_network_strength_changed_sig_handler
+ 'DeviceStrengthChanged': \
+ self.wireless_device_strength_changed_sig_handler,
+ 'WirelessNetworkAppeared': \
+ self.wireless_network_appeared_sig_handler,
+ 'WirelessNetworkDisappeared': \
+ self.wireless_network_disappeared_sig_handler,
+ 'WirelessNetworkStrengthChanged': \
+ self.wireless_network_strength_changed_sig_handler
}
try:
self._nm_proxy = sys_bus.get_object(NM_SERVICE, NM_PATH)
self._nm_obj = dbus.Interface(self._nm_proxy, NM_IFACE)
except dbus.DBusException, e:
- logging.debug("Could not connect to NetworkManager!")
+ logging.debug("Could not connect to NetworkManager: %s" % e)
self._nm_present = False
return
@@ -598,30 +610,38 @@ class NMClient(gobject.GObject):
dbus_interface="org.freedesktop.DBus")
for (signal, handler) in self._sig_handlers.items():
- sys_bus.add_signal_receiver(handler, signal_name=signal, dbus_interface=NM_IFACE)
+ sys_bus.add_signal_receiver(handler, signal_name=signal,
+ dbus_interface=NM_IFACE)
# Find out whether or not NMI is running
try:
- bus_object = sys_bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
- name = bus_object.GetNameOwner("org.freedesktop.NetworkManagerInfo", \
+ bus_object = sys_bus.get_object('org.freedesktop.DBus',
+ '/org/freedesktop/DBus')
+ name_ = bus_object.GetNameOwner( \
+ "org.freedesktop.NetworkManagerInfo",
dbus_interface='org.freedesktop.DBus')
- if name:
- self._nm_present = True
+ self._nm_present = True
except dbus.DBusException:
- pass
+ self._nm_present = False
- def set_active_device(self, device, network=None, mesh_freq=None, mesh_start=None):
+ def set_active_device(self, device, network=None,
+ mesh_freq=None, mesh_start=None):
ssid = ""
if network:
ssid = network.get_ssid()
if device.get_type() == DEVICE_TYPE_802_11_MESH_OLPC:
if mesh_freq or mesh_start:
if mesh_freq and not mesh_start:
- self._nm_obj.setActiveDevice(device.get_op(), dbus.Double(mesh_freq))
+ self._nm_obj.setActiveDevice(device.get_op(),
+ dbus.Double(mesh_freq))
elif mesh_start and not mesh_freq:
- self._nm_obj.setActiveDevice(device.get_op(), dbus.Double(0.0), dbus.UInt32(mesh_start))
+ self._nm_obj.setActiveDevice(device.get_op(),
+ dbus.Double(0.0),
+ dbus.UInt32(mesh_start))
else:
- self._nm_obj.setActiveDevice(device.get_op(), dbus.Double(mesh_freq), dbus.UInt32(mesh_start))
+ self._nm_obj.setActiveDevice(device.get_op(),
+ dbus.Double(mesh_freq),
+ dbus.UInt32(mesh_start))
else:
self._nm_obj.setActiveDevice(device.get_op())
else:
@@ -631,37 +651,43 @@ class NMClient(gobject.GObject):
logging.debug('NM State Changed to %d' % new_state)
def device_activation_stage_sig_handler(self, device, stage):
- logging.debug('Device Activation Stage "%s" for device %s' % (NM_DEVICE_STAGE_STRINGS[stage], device))
+ logging.debug('Device Activation Stage "%s" for device %s'
+ % (NM_DEVICE_STAGE_STRINGS[stage], device))
if not self._devices.has_key(device):
- logging.debug('DeviceActivationStage, device %s does not exist' % (device))
+ logging.debug('DeviceActivationStage, device %s does not exist'
+ % (device))
return
self._devices[device].set_activation_stage(stage)
def device_activating_sig_handler(self, device):
logging.debug('DeviceActivating for %s' % (device))
if not self._devices.has_key(device):
- logging.debug('DeviceActivating, device %s does not exist' % (device))
+ logging.debug('DeviceActivating, device %s does not exist'
+ % (device))
return
self._devices[device].set_state(DEVICE_STATE_ACTIVATING)
def device_now_active_sig_handler(self, device, ssid=None):
logging.debug('DeviceNowActive for %s' % (device))
if not self._devices.has_key(device):
- logging.debug('DeviceNowActive, device %s does not exist' % (device))
+ logging.debug('DeviceNowActive, device %s does not exist'
+ % (device))
return
self._devices[device].set_state(DEVICE_STATE_ACTIVATED)
def device_no_longer_active_sig_handler(self, device):
logging.debug('DeviceNoLongerActive for %s' % (device))
if not self._devices.has_key(device):
- logging.debug('DeviceNoLongerActive, device %s does not exist' % (device))
+ logging.debug('DeviceNoLongerActive, device %s does not exist'
+ % (device))
return
self._devices[device].set_state(DEVICE_STATE_INACTIVE)
def device_activation_failed_sig_handler(self, device, ssid=None):
logging.debug('DeviceActivationFailed for %s' % (device))
if not self._devices.has_key(device):
- logging.debug('DeviceActivationFailed, device %s does not exist' % (device))
+ logging.debug('DeviceActivationFailed, device %s does not exist'
+ % (device))
return
self._devices[device].set_state(DEVICE_STATE_INACTIVE)
@@ -703,7 +729,8 @@ class NMClient(gobject.GObject):
return
self._devices[device].set_strength(strength)
- def wireless_network_strength_changed_sig_handler(self, device, network, strength):
+ def wireless_network_strength_changed_sig_handler(self, device,
+ network, strength):
if not self._devices.has_key(device):
return
net = self._devices[device].get_network(network)
diff --git a/src/hardware/nminfo.py b/src/hardware/nminfo.py
index 3a93120..117cc4b 100644
--- a/src/hardware/nminfo.py
+++ b/src/hardware/nminfo.py
@@ -29,8 +29,8 @@ import keydialog
import gtk
from sugar import env
-IW_AUTH_KEY_MGMT_802_1X = 0x1
-IW_AUTH_KEY_MGMT_PSK = 0x2
+IW_AUTH_KEY_MGMT_802_1X = 0x1
+IW_AUTH_KEY_MGMT_PSK = 0x2
IW_AUTH_WPA_VERSION_DISABLED = 0x00000001
IW_AUTH_WPA_VERSION_WPA = 0x00000002
@@ -46,8 +46,8 @@ IW_AUTH_CIPHER_WEP104 = 0x00000010
IW_AUTH_ALG_OPEN_SYSTEM = 0x00000001
IW_AUTH_ALG_SHARED_KEY = 0x00000002
-NM_INFO_IFACE='org.freedesktop.NetworkManagerInfo'
-NM_INFO_PATH='/org/freedesktop/NetworkManagerInfo'
+NM_INFO_IFACE = 'org.freedesktop.NetworkManagerInfo'
+NM_INFO_PATH = '/org/freedesktop/NetworkManagerInfo'
class NoNetworks(dbus.DBusException):
@@ -68,39 +68,39 @@ class NetworkInvalidError(Exception):
class NMConfig(ConfigParser.ConfigParser):
def get_bool(self, section, name):
opt = self.get(section, name)
- if type(opt) == type(""):
+ if type(opt) == str:
if opt.lower() == 'yes' or opt.lower() == 'true':
return True
elif opt.lower() == 'no' or opt.lower() == 'false':
return False
- raise ValueError("Invalid format for %s/%s. Should be one of [yes, no, true, false]." % (section, name))
+ raise ValueError("Invalid format for %s/%s. Should be one of" \
+ " [yes, no, true, false]." % (section, name))
def get_list(self, section, name):
opt = self.get(section, name)
- if type(opt) == type(""):
- if not len(opt):
- return []
- try:
- return opt.split()
- except Exception:
- pass
- raise ValueError("Invalid format for %s/%s. Should be a space-separate list." % (section, name))
+ if type(opt) != str or not len(opt):
+ return []
+ try:
+ return opt.split()
+ except Exception:
+ raise ValueError("Invalid format for %s/%s. Should be a" \
+ " space-separate list." % (section, name))
def get_int(self, section, name):
opt = self.get(section, name)
try:
return int(opt)
- except Exception:
- pass
- raise ValueError("Invalid format for %s/%s. Should be a valid integer." % (section, name))
+ except ValueError:
+ raise ValueError("Invalid format for %s/%s. Should be a" \
+ " valid integer." % (section, name))
def get_float(self, section, name):
opt = self.get(section, name)
try:
return float(opt)
- except Exception:
- pass
- raise ValueError("Invalid format for %s/%s. Should be a valid float." % (section, name))
+ except ValueError:
+ raise ValueError("Invalid format for %s/%s. Should be a" \
+ " valid float." % (section, name))
NETWORK_TYPE_UNKNOWN = 0
@@ -111,6 +111,8 @@ NETWORK_TYPE_INVALID = 2
class Security(object):
def __init__(self, we_cipher):
self._we_cipher = we_cipher
+ self._key = None
+ self._auth_alg = None
def read_from_config(self, cfg, name):
pass
@@ -123,9 +125,12 @@ class Security(object):
we_cipher = cfg.get_int(name, "we_cipher")
if we_cipher == IW_AUTH_CIPHER_NONE:
security = Security(we_cipher)
- elif we_cipher == IW_AUTH_CIPHER_WEP40 or we_cipher == IW_AUTH_CIPHER_WEP104:
+ elif we_cipher == IW_AUTH_CIPHER_WEP40 or \
+ we_cipher == IW_AUTH_CIPHER_WEP104:
security = WEPSecurity(we_cipher)
- elif we_cipher == NM_AUTH_TYPE_WPA_PSK_AUTO or we_cipher == IW_AUTH_CIPHER_CCMP or we_cipher == IW_AUTH_CIPHER_TKIP:
+ elif we_cipher == NM_AUTH_TYPE_WPA_PSK_AUTO or \
+ we_cipher == IW_AUTH_CIPHER_CCMP or \
+ we_cipher == IW_AUTH_CIPHER_TKIP:
security = WPASecurity(we_cipher)
else:
raise ValueError("Unsupported security combo")
@@ -138,9 +143,12 @@ class Security(object):
try:
if we_cipher == IW_AUTH_CIPHER_NONE:
security = Security(we_cipher)
- elif we_cipher == IW_AUTH_CIPHER_WEP40 or we_cipher == IW_AUTH_CIPHER_WEP104:
+ elif we_cipher == IW_AUTH_CIPHER_WEP40 or \
+ we_cipher == IW_AUTH_CIPHER_WEP104:
security = WEPSecurity(we_cipher)
- elif we_cipher == NM_AUTH_TYPE_WPA_PSK_AUTO or we_cipher == IW_AUTH_CIPHER_CCMP or we_cipher == IW_AUTH_CIPHER_TKIP:
+ elif we_cipher == NM_AUTH_TYPE_WPA_PSK_AUTO or \
+ we_cipher == IW_AUTH_CIPHER_CCMP or \
+ we_cipher == IW_AUTH_CIPHER_TKIP:
security = WPASecurity(we_cipher)
else:
raise ValueError("Unsupported security combo")
@@ -183,13 +191,15 @@ class WEPSecurity(Security):
raise ValueError("Key length not right for 104-bit WEP")
try:
- a = binascii.a2b_hex(self._key)
+ binascii.a2b_hex(self._key)
except TypeError:
raise ValueError("Key was not a hexadecimal string.")
self._auth_alg = cfg.get_int(name, "auth_alg")
- if self._auth_alg != IW_AUTH_ALG_OPEN_SYSTEM and self._auth_alg != IW_AUTH_ALG_SHARED_KEY:
- raise ValueError("Invalid authentication algorithm %d" % self._auth_alg)
+ if self._auth_alg != IW_AUTH_ALG_OPEN_SYSTEM and \
+ self._auth_alg != IW_AUTH_ALG_SHARED_KEY:
+ raise ValueError("Invalid authentication algorithm %d"
+ % self._auth_alg)
def get_properties(self):
args = Security.get_properties(self)
@@ -203,6 +213,11 @@ class WEPSecurity(Security):
config.set(section, "auth_alg", self._auth_alg)
class WPASecurity(Security):
+ def __init__(self, we_cipher):
+ Security.__init__(self, we_cipher)
+ self._wpa_ver = None
+ self._key_mgmt = None
+
def read_from_args(self, args):
if len(args) != 3:
raise ValueError("not enough arguments")
@@ -220,7 +235,8 @@ class WPASecurity(Security):
if not isinstance(key_mgmt, int):
raise ValueError("wrong argument type for WPA key management")
if not key_mgmt & IW_AUTH_KEY_MGMT_PSK:
- raise ValueError("Key management types other than PSK are not supported")
+ raise ValueError("Key management types other than" \
+ " PSK are not supported")
self._key = key
self._wpa_ver = wpa_ver
@@ -233,17 +249,19 @@ class WPASecurity(Security):
raise ValueError("Key length not right for WPA-PSK")
try:
- a = binascii.a2b_hex(self._key)
+ binascii.a2b_hex(self._key)
except TypeError:
raise ValueError("Key was not a hexadecimal string.")
self._wpa_ver = cfg.get_int(name, "wpa_ver")
- if self._wpa_ver != IW_AUTH_WPA_VERSION_WPA and self._wpa_ver != IW_AUTH_WPA_VERSION_WPA2:
+ if self._wpa_ver != IW_AUTH_WPA_VERSION_WPA and \
+ self._wpa_ver != IW_AUTH_WPA_VERSION_WPA2:
raise ValueError("Invalid WPA version %d" % self._wpa_ver)
self._key_mgmt = cfg.get_int(name, "key_mgmt")
if not self._key_mgmt & IW_AUTH_KEY_MGMT_PSK:
- raise ValueError("Invalid WPA key management option %d" % self._key_mgmt)
+ raise ValueError("Invalid WPA key management option %d"
+ % self._key_mgmt)
def get_properties(self):
args = Security.get_properties(self)
@@ -271,7 +289,8 @@ class Network:
bssid_list = dbus.Array([], signature="s")
for item in self.bssids:
bssid_list.append(dbus.String(item))
- args = [dbus.String(self.ssid), dbus.Int32(self.timestamp), dbus.Boolean(True), bssid_list]
+ args = [dbus.String(self.ssid), dbus.Int32(self.timestamp),
+ dbus.Boolean(True), bssid_list]
args += self._security.get_properties()
return tuple(args)
@@ -306,7 +325,7 @@ class Network:
try:
self.bssids = config.get_list(self.ssid, "bssids")
except (ConfigParser.NoOptionError, ValueError), e:
- pass
+ logging.debug("Error reading bssids: %s" % e)
def write_to_config(self, config):
try:
@@ -332,15 +351,18 @@ class NMInfoDBusServiceHelper(dbus.service.Object):
bus = dbus.SystemBus()
# If NMI is already around, don't grab the NMI service
- bus_object = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
+ bus_object = bus.get_object('org.freedesktop.DBus',
+ '/org/freedesktop/DBus')
name = None
try:
- name = bus_object.GetNameOwner("org.freedesktop.NetworkManagerInfo", \
+ name = bus_object.GetNameOwner( \
+ "org.freedesktop.NetworkManagerInfo",
dbus_interface='org.freedesktop.DBus')
except dbus.DBusException:
pass
if name:
- logging.debug("NMI service already owned by %s, won't claim it." % name)
+ logging.debug("NMI service already owned by %s, won't claim it."
+ % name)
raise RuntimeError
bus_name = dbus.service.BusName(NM_INFO_IFACE, bus=bus)
@@ -354,16 +376,20 @@ class NMInfoDBusServiceHelper(dbus.service.Object):
raise NoNetworks()
- @dbus.service.method(NM_INFO_IFACE, in_signature='si', async_callbacks=('async_cb', 'async_err_cb'))
+ @dbus.service.method(NM_INFO_IFACE, in_signature='si',
+ async_callbacks=('async_cb', 'async_err_cb'))
def getNetworkProperties(self, ssid, net_type, async_cb, async_err_cb):
- self._parent.get_network_properties(ssid, net_type, async_cb, async_err_cb)
+ self._parent.get_network_properties(ssid, net_type,
+ async_cb, async_err_cb)
@dbus.service.method(NM_INFO_IFACE)
def updateNetworkInfo(self, ssid, bauto, bssid, cipher, *args):
self._parent.update_network_info(ssid, bauto, bssid, cipher, args)
- @dbus.service.method(NM_INFO_IFACE, async_callbacks=('async_cb', 'async_err_cb'))
- def getKeyForNetwork(self, dev_path, net_path, ssid, attempt, new_key, async_cb, async_err_cb):
+ @dbus.service.method(NM_INFO_IFACE,
+ async_callbacks=('async_cb', 'async_err_cb'))
+ def getKeyForNetwork(self, dev_path, net_path, ssid, attempt,
+ new_key, async_cb, async_err_cb):
self._parent.get_key_for_network(dev_path, net_path, ssid,
attempt, new_key, async_cb, async_err_cb)
@@ -399,7 +425,8 @@ class NMInfo(object):
net.read_from_config(config)
networks[name] = net
except Exception, e:
- logging.error("Error when processing config for the network %s: %r" % (name, e))
+ logging.error("Error when processing config for" \
+ " the network %s: %r" % (name, e))
del config
return networks
@@ -456,7 +483,8 @@ class NMInfo(object):
logging.debug("Error updating network information: %s" % e)
del net
- def get_key_for_network(self, dev_op, net_op, ssid, attempt, new_key, async_cb, async_err_cb):
+ def get_key_for_network(self, dev_op, net_op, ssid, attempt,
+ new_key, async_cb, async_err_cb):
if not isinstance(ssid, unicode):
raise ValueError("Invalid arguments; ssid must be unicode.")
if self._allowed_networks.has_key(ssid) and not new_key:
@@ -487,7 +515,7 @@ class NMInfo(object):
self._key_dialog.connect("destroy", self._key_dialog_destroy_cb)
self._key_dialog.show_all()
- def _key_dialog_destroy_cb(self, widget, foo=None):
+ def _key_dialog_destroy_cb(self, widget, data=None):
if widget != self._key_dialog:
return
self._key_dialog_response_cb(widget, gtk.RESPONSE_CANCEL)
@@ -497,7 +525,6 @@ class NMInfo(object):
return
(async_cb, async_err_cb) = self._key_dialog.get_callbacks()
- net = self._key_dialog.get_network()
security = None
if response_id == gtk.RESPONSE_OK:
security = self._key_dialog.create_security()