| 1 | # Copyright (C) 2006 The University of Waikato |
|---|
| 2 | # |
|---|
| 3 | # This file is part of crcnetd - CRCnet Configuration System Daemon |
|---|
| 4 | # |
|---|
| 5 | # Status Module - Keeps track of the status of hosts in the system |
|---|
| 6 | # |
|---|
| 7 | # Author: Matt Brown <matt@crc.net.nz> |
|---|
| 8 | # Version: $Id$ |
|---|
| 9 | # |
|---|
| 10 | # crcnetd is free software; you can redistribute it and/or modify it under the |
|---|
| 11 | # terms of the GNU General Public License version 2 as published by the Free |
|---|
| 12 | # Software Foundation. |
|---|
| 13 | # |
|---|
| 14 | # crcnetd is distributed in the hope that it will be useful, but WITHOUT ANY |
|---|
| 15 | # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|---|
| 16 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
|---|
| 17 | # details. |
|---|
| 18 | # |
|---|
| 19 | # You should have received a copy of the GNU General Public License along with |
|---|
| 20 | # crcnetd; if not, write to the Free Software Foundation, Inc., |
|---|
| 21 | # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
|---|
| 22 | import os.path |
|---|
| 23 | import threading |
|---|
| 24 | from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher |
|---|
| 25 | from pysnmp.carrier.asynsock.dgram import udp |
|---|
| 26 | from pyasn1.codec.ber import encoder, decoder |
|---|
| 27 | from pysnmp.proto import api |
|---|
| 28 | from time import time |
|---|
| 29 | import random |
|---|
| 30 | |
|---|
| 31 | from crcnetd._utils.ccsd_common import * |
|---|
| 32 | from crcnetd._utils.ccsd_log import * |
|---|
| 33 | from crcnetd._utils.ccsd_events import * |
|---|
| 34 | from crcnetd._utils.ccsd_session import getSession, getSessionE |
|---|
| 35 | from crcnetd._utils.ccsd_server import exportViaXMLRPC, initThread |
|---|
| 36 | from crcnetd._utils.ccsd_cfengine import getCfengineHostStatus |
|---|
| 37 | from crcnetd.modules.ccs_host import ccs_host, getHostList |
|---|
| 38 | |
|---|
| 39 | ccs_mod_type = CCSD_SERVER |
|---|
| 40 | |
|---|
| 41 | DEFAULT_CHECK_INTERVAL = 3*60 |
|---|
| 42 | |
|---|
| 43 | CCS_REVISION_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 44 | LOAD_AVG1_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 45 | LOAD_AVG5_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 46 | LOAD_AVG15_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 47 | UPTIME_OID = (1,3,6,1,4,1,15120,1,3,1) |
|---|
| 48 | |
|---|
| 49 | class ccs_status_error(ccsd_error): |
|---|
| 50 | pass |
|---|
| 51 | |
|---|
| 52 | class ccs_host_status(ccs_class): |
|---|
| 53 | """Monitors the status of a host in the system""" |
|---|
| 54 | |
|---|
| 55 | def __init__(self, session_id, host_id, interval): |
|---|
| 56 | """Initialises a new class for a specified host. |
|---|
| 57 | |
|---|
| 58 | The specified session must be valid and have appropriate access to |
|---|
| 59 | the database. |
|---|
| 60 | """ |
|---|
| 61 | |
|---|
| 62 | self.lock = threading.RLock() |
|---|
| 63 | |
|---|
| 64 | session = getSession(session_id) |
|---|
| 65 | if session is None: |
|---|
| 66 | raise ccs_host_error("Invalid session id") |
|---|
| 67 | self._session_id = session_id |
|---|
| 68 | |
|---|
| 69 | # See if the specified host id makes sense |
|---|
| 70 | self.host = ccs_host(session_id, host_id) |
|---|
| 71 | |
|---|
| 72 | # Initialise the status information |
|---|
| 73 | self.reachable = False |
|---|
| 74 | self.operatingRevision = -1 |
|---|
| 75 | self.operatingRevisionStatus = "unknown" |
|---|
| 76 | self.operatingRevisionText = "Unknown" |
|---|
| 77 | self.operatingRevisionHint = "Status not yet fetched" |
|---|
| 78 | self.currentLoad = (-1,-1,-1) |
|---|
| 79 | self.uptime = 0 |
|---|
| 80 | self.infoUpdatedAt = 0 |
|---|
| 81 | # Choose a random number of seconds up to half of the check interval |
|---|
| 82 | # long and pretend the last check happened at that time so we |
|---|
| 83 | # do some naieve staggering to avoid checking every host at once |
|---|
| 84 | delay = random.randint((interval)/2, interval) |
|---|
| 85 | self.lastCheckedAt = time.time() - delay |
|---|
| 86 | log_debug("Initialised host status monitor for %s with %d sec " \ |
|---|
| 87 | "stagger" % (self.host["host_name"], (interval)-delay)) |
|---|
| 88 | |
|---|
| 89 | def update(self): |
|---|
| 90 | """Called by the processing thread to trigger an update of the data""" |
|---|
| 91 | ip = self.host["ip_address"] |
|---|
| 92 | |
|---|
| 93 | # Grab the lock |
|---|
| 94 | lock = self.lock |
|---|
| 95 | lock.acquire() |
|---|
| 96 | try: |
|---|
| 97 | # Check that the host is reachable |
|---|
| 98 | if not isHostUp(ip): |
|---|
| 99 | self.reachable = False |
|---|
| 100 | self.lastCheckedAt = time.time() |
|---|
| 101 | return |
|---|
| 102 | self.reachable = True |
|---|
| 103 | # Retrieve host status information |
|---|
| 104 | pMod = api.protoModules[api.protoVersion1] |
|---|
| 105 | reqPDU = pMod.GetRequestPDU() |
|---|
| 106 | pMod.apiPDU.setDefaults(reqPDU) |
|---|
| 107 | pMod.apiPDU.setVarBinds( |
|---|
| 108 | reqPDU, ((CCS_REVISION_OID, pMod.Null()), |
|---|
| 109 | (LOAD_AVG1_OID, pMod.Null()), |
|---|
| 110 | (LOAD_AVG5_OID, pMod.Null()), |
|---|
| 111 | (LOAD_AVG15_OID, pMod.Null()), |
|---|
| 112 | (UPTIME_OID, pMod.Null()) |
|---|
| 113 | ) |
|---|
| 114 | ) |
|---|
| 115 | # Build message |
|---|
| 116 | reqMsg = pMod.Message() |
|---|
| 117 | pMod.apiMessage.setDefaults(reqMsg) |
|---|
| 118 | pMod.apiMessage.setCommunity(reqMsg, 'public') |
|---|
| 119 | pMod.apiMessage.setPDU(reqMsg, reqPDU) |
|---|
| 120 | # Dispatch Request |
|---|
| 121 | transportDispatcher = AsynsockDispatcher() |
|---|
| 122 | transportDispatcher.registerTransport( |
|---|
| 123 | udp.domainName, udp.UdpSocketTransport().openClientMode() |
|---|
| 124 | ) |
|---|
| 125 | |
|---|
| 126 | startedAt = time.time() |
|---|
| 127 | def cbTimerFun(timeNow): |
|---|
| 128 | if timeNow - startedAt > 3: |
|---|
| 129 | transportDispatcher.jobFinished(1) |
|---|
| 130 | transportDispatcher.closeDispatcher() |
|---|
| 131 | raise ccs_status_error("Request timed out") |
|---|
| 132 | |
|---|
| 133 | def cbRecvFun(transportDispatcher, transportDomain, \ |
|---|
| 134 | transportAddress, wholeMsg, reqPDU=reqPDU): |
|---|
| 135 | while wholeMsg: |
|---|
| 136 | rspMsg, wholeMsg = decoder.decode(wholeMsg, \ |
|---|
| 137 | asn1Spec=pMod.Message()) |
|---|
| 138 | rspPDU = pMod.apiMessage.getPDU(rspMsg) |
|---|
| 139 | # Match response to request |
|---|
| 140 | if pMod.apiPDU.getRequestID(reqPDU) == \ |
|---|
| 141 | pMod.apiPDU.getRequestID(rspPDU): |
|---|
| 142 | # Check for SNMP errors reported |
|---|
| 143 | errorStatus = pMod.apiPDU.getErrorStatus(rspPDU) |
|---|
| 144 | if errorStatus: |
|---|
| 145 | transportDispatcher.jobFinished(1) |
|---|
| 146 | transportDispatcher.closeDispatcher() |
|---|
| 147 | raise ccs_status_error(errorStatus.prettyPrint()) |
|---|
| 148 | else: |
|---|
| 149 | la1 = 0 |
|---|
| 150 | la5 = 0 |
|---|
| 151 | la15 = 0 |
|---|
| 152 | for oid, val in pMod.apiPDU.getVarBinds(rspPDU): |
|---|
| 153 | if oid == CCS_REVISION_OID: |
|---|
| 154 | self.operatingRevision = str(val) |
|---|
| 155 | self._parseOperatingRevision() |
|---|
| 156 | elif oid == LOAD_AVG1_OID: |
|---|
| 157 | la1 = val |
|---|
| 158 | elif oid == LOAD_AVG5_OID: |
|---|
| 159 | la5 = val |
|---|
| 160 | elif oid == LOAD_AVG15_OID: |
|---|
| 161 | la15 = val |
|---|
| 162 | elif oid == UPTIME_OID: |
|---|
| 163 | self.uptime = val |
|---|
| 164 | self.currentLoad = (la1, la5, la15) |
|---|
| 165 | transportDispatcher.jobFinished(1) |
|---|
| 166 | return wholeMsg |
|---|
| 167 | |
|---|
| 168 | transportDispatcher.registerRecvCbFun(cbRecvFun) |
|---|
| 169 | transportDispatcher.registerTimerCbFun(cbTimerFun) |
|---|
| 170 | transportDispatcher.sendMessage( |
|---|
| 171 | encoder.encode(reqMsg), udp.domainName, (ip, 161) |
|---|
| 172 | ) |
|---|
| 173 | transportDispatcher.jobStarted(1) |
|---|
| 174 | transportDispatcher.runDispatcher() |
|---|
| 175 | transportDispatcher.closeDispatcher() |
|---|
| 176 | # Record that the details were updated |
|---|
| 177 | self.infoUpdatedAt = time.time() |
|---|
| 178 | self.lastCheckedAt = time.time() |
|---|
| 179 | log_info("Updated status details for %s" % self.host["host_name"]) |
|---|
| 180 | finally: |
|---|
| 181 | lock.release() |
|---|
| 182 | |
|---|
| 183 | def _parseOperatingRevision(self): |
|---|
| 184 | """Parsing the incoming operating revision and discerns what it means |
|---|
| 185 | about the status of the host""" |
|---|
| 186 | |
|---|
| 187 | if self.operatingRevision == -1 or self.operatingRevision=="": |
|---|
| 188 | # Host doesn't know it's own revision! |
|---|
| 189 | self.operatingRevisionStatus = "critical" |
|---|
| 190 | self.operatingRevisionText = "- Invalid State" |
|---|
| 191 | self.operatingRevisionHint = "The host is in an invalid state" |
|---|
| 192 | self.operatingRevision = "" |
|---|
| 193 | return |
|---|
| 194 | |
|---|
| 195 | if self.operatingRevision.find("M") != -1: |
|---|
| 196 | self.operatingRevisionStatus = "warning" |
|---|
| 197 | self.operatingRevisionText = "with manual changes!" |
|---|
| 198 | self.operatingRevisionHint = "Manual changes risk being overwritten" |
|---|
| 199 | self.operatingRevision = self.operatingRevision[:-1] |
|---|
| 200 | return |
|---|
| 201 | |
|---|
| 202 | if self.operatingRevision.find(":") != -1: |
|---|
| 203 | parts = self.operatingRevision.split(":") |
|---|
| 204 | self.operatingRevisionStatus = "warning" |
|---|
| 205 | self.operatingRevisionText = "mixed with r%s" % parts[1] |
|---|
| 206 | self.operatingRevisionHint = "There are configuration files on " \ |
|---|
| 207 | "the host that do not come from the active revision" |
|---|
| 208 | self.operatingRevision = parts[0] |
|---|
| 209 | return |
|---|
| 210 | |
|---|
| 211 | self.operatingRevisionStatus = "ok" |
|---|
| 212 | self.operatingRevisionText = "" |
|---|
| 213 | self.operatingRevisionHint = "The host configuration is up to date" |
|---|
| 214 | return |
|---|
| 215 | |
|---|
| 216 | def __getattribute__(self, name): |
|---|
| 217 | """Override the default accessor to put a lock around all accesses""" |
|---|
| 218 | |
|---|
| 219 | lock = object.__getattribute__(self, "lock") |
|---|
| 220 | lock.acquire() |
|---|
| 221 | try: |
|---|
| 222 | val = object.__getattribute__(self, name) |
|---|
| 223 | return val |
|---|
| 224 | finally: |
|---|
| 225 | lock.release() |
|---|
| 226 | |
|---|
| 227 | def getHostStatus(host_name): |
|---|
| 228 | """Returns the host status object for the specified host""" |
|---|
| 229 | global _statusInfo |
|---|
| 230 | |
|---|
| 231 | if host_name not in _statusInfo.keys(): |
|---|
| 232 | raise ccs_status_error("No status information for %s" % host_name) |
|---|
| 233 | |
|---|
| 234 | return _statusInfo[host_name] |
|---|
| 235 | |
|---|
| 236 | @catchEvent("shutdown") |
|---|
| 237 | def shutdownHandler(*args, **kwargs): |
|---|
| 238 | global _runStatus |
|---|
| 239 | _runStatus = False |
|---|
| 240 | |
|---|
| 241 | _statusInfo = {} |
|---|
| 242 | _runStatus = True |
|---|
| 243 | def statusThread(): |
|---|
| 244 | """Runs as a thread to keep the status information up to date""" |
|---|
| 245 | global _statusInfo, _runStatus |
|---|
| 246 | |
|---|
| 247 | try: |
|---|
| 248 | # What interval shall we check hosts at |
|---|
| 249 | interval = config_get("status", "interval", DEFAULT_CHECK_INTERVAL) |
|---|
| 250 | |
|---|
| 251 | # Initialise the host status information |
|---|
| 252 | hosts = getHostList(ADMIN_SESSION_ID) |
|---|
| 253 | for host in getHostList(ADMIN_SESSION_ID): |
|---|
| 254 | if not host["host_active"]: |
|---|
| 255 | continue |
|---|
| 256 | name = host["host_name"] |
|---|
| 257 | _statusInfo[name] = ccs_host_status(ADMIN_SESSION_ID, \ |
|---|
| 258 | host["host_id"], interval) |
|---|
| 259 | |
|---|
| 260 | # Loop forever reading status as appropriate |
|---|
| 261 | lastCheck = time.time()-31 |
|---|
| 262 | while _runStatus: |
|---|
| 263 | # If less than 30 seconds since the last check, wait a bit |
|---|
| 264 | if time.time() - lastCheck < 30: |
|---|
| 265 | time.sleep(2) |
|---|
| 266 | continue |
|---|
| 267 | # See which hosts need checking now |
|---|
| 268 | for host_name, host_status in _statusInfo.items(): |
|---|
| 269 | if time.time() - host_status.lastCheckedAt > interval: |
|---|
| 270 | try: |
|---|
| 271 | host_status.update() |
|---|
| 272 | except: |
|---|
| 273 | log_error("Failed to update status of %s" % \ |
|---|
| 274 | host_name, sys.exc_info()) |
|---|
| 275 | # Record when we last ran some checks |
|---|
| 276 | lastCheck = time.time() |
|---|
| 277 | except: |
|---|
| 278 | log_error("Exception in status monitor thread!", sys.exc_info()) |
|---|
| 279 | |
|---|
| 280 | log_info("Exiting status monitor thread") |
|---|
| 281 | |
|---|
| 282 | def ccs_init(): |
|---|
| 283 | # Start the status information thread |
|---|
| 284 | initThread(statusThread) |
|---|