glances_ip.py 6.8 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-
#
# This file is part of Glances.
#
N
nicolargo 已提交
5
# Copyright (C) 2019 Nicolargo <nicolas@nicolargo.com>
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#
# Glances is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Glances is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""IP plugin."""

22
import threading
23
from json import loads
24

N
nicolargo 已提交
25
from glances.compat import iterkeys, urlopen, queue
26
from glances.logger import logger
27
from glances.timer import Timer
28 29
from glances.plugins.glances_plugin import GlancesPlugin

30 31 32 33 34
# Import plugin specific dependency
try:
    import netifaces
except ImportError as e:
    import_error_tag = True
N
nicolargo 已提交
35
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
36
else:
37
    import_error_tag = False
38

39 40 41 42 43
# List of online services to retreive public IP address
# List of tuple (url, json, key)
# - url: URL of the Web site
# - json: service return a JSON (True) or string (False)
# - key: key of the IP addresse in the JSON structure
44 45 46
urls = [('https://ip.42.pl/raw', False, None),
        ('https://httpbin.org/ip', True, 'origin'),
        ('https://jsonip.com', True, 'ip'),
47 48
        ('https://api.ipify.org/?format=json', True, 'ip')]

49 50

class Plugin(GlancesPlugin):
A
PEP 257  
Alessio Sergi 已提交
51
    """Glances IP Plugin.
52 53 54 55

    stats is a dict
    """

56
    def __init__(self, args=None, config=None):
57
        """Init the plugin."""
58
        super(Plugin, self).__init__(args=args, config=config)
59 60 61 62

        # We want to display the stat in the curse interface
        self.display_curse = True

63
        # Get the public IP address once (not for each refresh)
64
        if not import_error_tag:
65
            self.public_address = PublicIpAddress().get()
66

67
    @GlancesPlugin._check_decorator
68 69 70 71 72 73
    @GlancesPlugin._log_result_decorator
    def update(self):
        """Update IP stats using the input method.

        Stats is dict
        """
74 75
        # Init new stats
        stats = self.get_init_value()
76

77
        if self.input_method == 'local' and not import_error_tag:
78 79 80
            # Update stats using the netifaces lib
            try:
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
N
nicolargo 已提交
81
            except (KeyError, AttributeError) as e:
82
                logger.debug("Cannot grab the default gateway ({})".format(e))
83 84
            else:
                try:
85 86 87 88 89
                    stats['address'] = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
                    stats['mask'] = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
                    stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
                    stats['gateway'] = netifaces.gateways()['default'][netifaces.AF_INET][0]
                    stats['public_address'] = self.public_address
N
nicolargo 已提交
90
                except (KeyError, AttributeError) as e:
91
                    logger.debug("Cannot grab IP information: {}".format(e))
92
        elif self.input_method == 'snmp':
93 94 95
            # Not implemented yet
            pass

96 97 98
        # Update the stats
        self.stats = stats

99 100 101
        return self.stats

    def update_views(self):
A
PEP 257  
Alessio Sergi 已提交
102
        """Update stats views."""
103
        # Call the father's method
A
Alessio Sergi 已提交
104
        super(Plugin, self).update_views()
105 106 107

        # Add specifics informations
        # Optional
A
Alessio Sergi 已提交
108
        for key in iterkeys(self.stats):
109 110
            self.views[key]['optional'] = True

111
    def msg_curse(self, args=None, max_width=None):
112 113 114 115 116
        """Return the dict to display in the curse interface."""
        # Init the return message
        ret = []

        # Only process if stats exist and display plugin enable...
117
        if not self.stats or self.is_disable() or import_error_tag:
118 119 120
            return ret

        # Build the string message
A
Alessio Sergi 已提交
121
        msg = ' - '
N
nicolargo 已提交
122
        ret.append(self.curse_add_line(msg))
A
Alessio Sergi 已提交
123
        msg = 'IP '
124
        ret.append(self.curse_add_line(msg, 'TITLE'))
125 126 127
        if 'address' in self.stats:
            msg = '{}'.format(self.stats['address'])
            ret.append(self.curse_add_line(msg))
128 129
        if 'mask_cidr' in self.stats:
            # VPN with no internet access (issue #842)
130
            msg = '/{}'.format(self.stats['mask_cidr'])
131
            ret.append(self.curse_add_line(msg))
N
nicolargo 已提交
132
        try:
133
            msg_pub = '{}'.format(self.stats['public_address'])
134 135
        except (UnicodeEncodeError, KeyError):
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
N
nicolargo 已提交
136 137 138 139 140 141
            pass
        else:
            if self.stats['public_address'] is not None:
                msg = ' Pub '
                ret.append(self.curse_add_line(msg, 'TITLE'))
                ret.append(self.curse_add_line(msg_pub))
142 143 144 145 146

        return ret

    @staticmethod
    def ip_to_cidr(ip):
A
PEP 257  
Alessio Sergi 已提交
147 148 149 150
        """Convert IP address to CIDR.

        Example: '255.255.255.0' will return 24
        """
151 152
        # Thanks to @Atticfire
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
153 154 155
        if ip is None:
            # Correct issue #1528
            return 0
156
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
157 158 159


class PublicIpAddress(object):
160
    """Get public IP address from online services."""
161 162

    def __init__(self, timeout=2):
163
        """Init the class."""
164 165 166
        self.timeout = timeout

    def get(self):
167
        """Get the first public IP address returned by one of the online services."""
168
        q = queue.Queue()
169 170 171 172 173 174

        for u, j, k in urls:
            t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k))
            t.daemon = True
            t.start()

175
        timer = Timer(self.timeout)
176
        ip = None
177
        while not timer.finished() and ip is None:
178 179 180
            if q.qsize() > 0:
                ip = q.get()

181 182 183
        if ip is None:
            return None

184
        return ', '.join(set([x.strip() for x in ip.split(',')]))
185

186
    def _get_ip_public(self, queue_target, url, json=False, key=None):
187
        """Request the url service and put the result in the queue_target."""
188
        try:
189
            response = urlopen(url, timeout=self.timeout).read().decode('utf-8')
190
        except Exception as e:
191
            logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e))
192
            queue_target.put(None)
193 194
        else:
            # Request depend on service
N
nicolargo 已提交
195 196 197 198 199 200 201
            try:
                if not json:
                    queue_target.put(response)
                else:
                    queue_target.put(loads(response)[key])
            except ValueError:
                queue_target.put(None)