# -*- coding: utf-8 -*-
"""
Copyright © 2017-2018 The University of New South Wales
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Except as contained in this notice, the name or trademarks of a copyright holder
shall not be used in advertising or otherwise to promote the sale, use or other
dealings in this Software without prior written authorization of the copyright
holder.
UNSW is a trademark of The University of New South Wales.
Created on Mon Sep 18 09:22:40 2017
"""
import time
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import multiprocessing
import logging
import threading
########
#
# Logging
#
########
log = logging.getLogger('libgs-log')
log.addHandler(logging.NullHandler())
############################################
#
# Alert class and
# specific derived alert types.
# A user can specify others if she wishes.
#
############################################
[docs]class Alert(object):
def __init__(self, val):
self.val = val
def __str__(self):
if not hasattr(self, 'alertstr'):
return "'{}' Unknown Alert".format(self.val)
else:
return "'{}' ( {} alert<{}> )".format(self.val.__str__(), self.alertstr if self.alertstr else "NO", self.alertcode)
def __repr__(self):
return self.__str__()
[docs]class CriticalAlert(Alert):
alertstr="CRITICAL"
alertcode=100
[docs]class RedAlert(Alert):
alertstr="RED"
alertcode = 30
[docs]class OrangeAlert(Alert):
alertstr="ORANGE"
alertcode = 20
[docs]class GreenAlert(Alert):
alertstr="GREEN"
alertcode = 10
[docs]class NoAlert(Alert):
alertstr = None
alertcode = 0
[docs]class MonitorItem(tuple):
_fields = ('name', 'value', 'alert', 'parent', 'age')
def __new__(cls, item):
tuplelist = []
for f in cls._fields:
try:
tuplelist += [item[f]]
except:
tuplelist += [None]
return super (MonitorItem, cls).__new__(cls, tuplelist)
def __str__(self):
return '(' + ', '.join( "'{}': {}".format(k, "'{}'".format(v) if isinstance(v, basestring) else str(v)) for k,v in self.items() ) + ')'
def __repr__(self):
return self.__str__()
[docs] def keys(self):
return self._fields
[docs] def items(self):
for k, f in enumerate(self._fields):
yield self._fields[k], self[f]
def __getitem__(self, item):
if item in self._fields:
idx = self._fields.index(item)
else:
idx = item
return super(MonitorItem, self).__getitem__(idx)
############################################
#
# The monitor magic
#
############################################
[docs]class Monitor(object):
"""
Class to provide functionality for monitoring
"""
def __init__(self,
workers=10,
tick=0.5,
tick_cb = lambda x: None, #TODO: Perhaps get rid of tick callbacks.
default_dt = 10):
"""
Args:
workers: Maximum number of simultaneouse threads to spawn in the ThreadPool
tick: The delay between successive runs of the monitor loop. Should be fairly small (< 1 sec).
tick_cb: A callback that can be invoked at the end of every tick.
default_dt: Default dt to apply to monitor generators (if dt not specified explicitly)
"""
# Defaults
self.default_dt = default_dt
# Callback
self.tick_cb = tick_cb
self._callbacks = [] #<-- added with add_callback
# Set up the executor
self._executor = ThreadPoolExecutor(workers) #<-- max threads we permit at a time
# This is the tick
self._tick = tick
# A dictionary to hold the parent headings
self._parents = {}
# The exec map keeps track of all the monitored values and their execution status and times
self._exec_map = pd.DataFrame(
columns=['exec_t', 'next_exec_t', 'name', 'parent', 'dt', 'gen',
'loglvl_values','loglvl_alerts','logthr_alerts','dependents', 'value', 'alert'])
# A dictionary to map alert values to human readable strings
self._alertstr_map = {}
# The monitoring loop runs in a separate thread
self._pthr = None
self._abort = threading.Event()
def _get_alertstr(self, key):
if key not in self._alertstr_map.keys():
return ""
else:
return self._alertstr_map[key]
def __del__(self):
self.stop()
self._executor.shutdown(wait=False)
@property
def alertcode(self):
return max(self._exec_map.alert)
[docs] def register_parent(self,
name,
parent=None):
"""
A parent monitor is not really a monitor. It is merely the product
of its children and exists for visualisation purposes only.
Its alert status will always be the worst of its children
"""
# If parent hadnt been explicitly defined already, define it at the top level
if parent is not None and parent not in self._parents.keys():
self.register_parent(parent)
self._parents[name] = dict(parent=parent)
[docs] def add_callback(self, callable):
"""
Add a callable to be invoked every N times a value is polled.
.. note::
The callback will be executed in the same sub-thread that does the polling
The prototype of the callable should be
some_function(point_name, tstamp, exc, ret)
where
* point_name is the name of the monitor point that has been polled
* tstamp is the unix timestamp at which the monitor function returned a value (obtained wiht time.time()),
* exc is None if no exception happened, otherwise it is set to the exception that occurred.
* ret are the return values. Will always be an Alert object.
Args:
callable: The callable
Returns:
"""
self._callbacks.append(callable)
[docs] def to_gen_in_executor(self, fn, point_names, *args, **kwargs):
"""
This decorator will take any function and turn it into a generator
that always returns immediately after calls to next().
The call to fn is being delegated to an executor, and while it is not
done, calls to next() will return None, otherwise it will return a tuple containging
* the timestamp the data the return
value of fn
Args:
fn:
Returns:
"""
assert isinstance(point_names, tuple)
numvals = len(point_names)
#
# Further wrap the call to catch any exceptions and timestamp
# the output
#
def exc_wrapped(*args, **kwargs):
try:
res = fn(*args, **kwargs)
if not (numvals == 1 or (numvals == len(res) and isinstance(res, tuple))):
lres = len(res) if isinstance(res, tuple) else 1
raise Exception("Unexpected number of returned values from monitor function. expected {} value(s), got {} value(s)".format(numvals, lres))
exc = None
except Exception as e:
exc = e
res = None
if res is not None and numvals > 1:
res = tuple(r if isinstance(r, Alert) else NoAlert(r) for r in res)
else:
if not isinstance(res, Alert):
res = NoAlert(res)
ret = time.time(), exc, res
# Invoke callbacks
for cb in self._callbacks:
try:
if not isinstance(res, tuple):
res2 = (res,)*len(point_names)
else:
res2 = res
for k, p in enumerate(point_names):
cb(p, time.time(), exc, res2[k])
except Exception as e:
log.error("Error invoking callback function {}. {}: {}".format(cb.__name__, e.__class__.__name__, e))
return ret
def generator(*args, **kwargs):
while True:
future = self._executor.submit(exc_wrapped, *args, **kwargs)
# Ensure a call returns immediately, and returns None if not ready
while not future.done():
yield None
yield future.result()
return generator()
#############
#
# Convenience decorators
#
#############
[docs] def callback(self):
"""
Decorator to convert a function definition to an IntervalCallback instance and add it to the
monitor's callback list.
"""
# Otherwise convert it to an IntervalCallback instance
def decorator(fn):
self.add_callback(fn)
return fn
return decorator
[docs] def monitor(self, point = None, *args, **kwargs):
"""
Creates a decorator that can be applied to any function to add it to be monitored.
It will do two things:
1) convert the function to a generator in which the function call is run in an executor, so that
any call to the generators next() function will return immeditately and not hold up execution. If
the function has not finished the generator will return None. If it has completed, it will return
the value as well as a timestamp and any potential exception in the format required for the monitoring
loop.
2) add the function to the Monitor class polling schedule.
Args:
point: The name of the monitor point. If omitted it will use the function name
*args, **kwargs: Also accepts all the arguments of register_monitor()
Returns:
Decorator
"""
def decorator(fn):
if point is None:
point_names = (fn.__name__,)
elif not isinstance(point, tuple):
point_names = (point,)
else:
point_names = point
def decorated(*args2, **kwargs2):
f = self.to_gen_in_executor(fn=fn, point_names=point_names,*args2, **kwargs2)
return f
self.register_monitor(name=point_names[0], gen=decorated(), dependents = point_names[1:], *args, **kwargs)
for p in point_names[1:]:
self.register_monitor(name=p, gen=None, *args, **kwargs)
return decorated
return decorator
[docs] def register_monitor(self,
name,
gen,
dt = -1,
parent=None,
loglvl_values = None,
loglvl_alerts = logging.DEBUG,
logthr_alerts = RedAlert.alertcode,
dependents = tuple(),
alert_exc = NoAlert):
"""
This is a low level function to register a monitor generator with the monitor class.
The format of the generator is quite specific, and it should therefore ideally have been created from
a callable using the to_gen_in_executor() method. Using this method will ensure the generator returns
values in the right format for the monitor class, and that it never blocks.
Args:
name: The name of the monitor point
gen: The monitor generator (* see description above)
dt: The time interval in which to poll the monitored (dt = None -> one shot)
parent: Assign a parent to the monitored (For grouping purposes only,
loglvl_values: If not None, any change in value will be logged with the logging level specified.
loglvl_alerts: If not None, a change in alert level may be logged with the logging level specified
logthr_alerts: The threshold for the alertcode above by which to log. Default = RedAlert.alertcode
dependents: Other monitors to update from same generator
alert_exc: Alert to set in case an exception is raised in monitor function. Default = NoAlert
"""
if dt is None:
dt = 1e11 #<-- large number to prevent it from running again
# One-shot can be enabled by setting dt == None. Negative dt = unset, and defaults will be applied
# Continous running (ie every tick) is enabled by setting dt = 0
elif dt < 0:
dt = self.default_dt
log.debug("Adding monitor point '{}'".format(name))
# If parent hadnt been explicitly defined already, define it at the top level
if parent is not None and parent not in self._parents.keys():
self.register_parent(parent)
if name in self._exec_map['name']:
raise Error("{} already exists. Names must be unique".format(name))
self._exec_map = self._exec_map.append(dict(
exec_t=None,
next_exec_t = time.time(),
name=name,
dt=dt,
gen=gen,
parent=parent,
dependents=dependents,
loglvl_values = loglvl_values,
loglvl_alerts = loglvl_alerts,
logthr_alerts = logthr_alerts,
alert_exc=alert_exc),
ignore_index=True)
# Make sure value is of dtype=object so it can
# accept values of different types
if self._exec_map.value.dtype != 'object':
self._exec_map.value = self._exec_map.value.astype('object')
if self._exec_map.alert.dtype != 'object':
self._exec_map.alert = self._exec_map.alert.astype('object')
self._exec_map.sort_values('exec_t', inplace=True)
[docs] def start(self, subprocess = False):
if self._pthr is not None:
return
if subprocess:
self._pthr = multiprocessing.Process(target=self._polling_loop)
self._pthr.daemon = True
else:
self._pthr = threading.Thread(target=self._polling_loop)
self._pthr.daemon = True
self._pthr.start()
[docs] def stop(self):
if self._pthr is None:
return
self._abort.set()
self._pthr.join()
self._pthr = None
def _polling_loop(self):
while not self._abort.is_set():
self._poll_due()
# TODO: Generalise this to be able to call add_cb and add any number of callbacks at different rates
try:
self.tick_cb(self)
except Exception as e:
log.error("Error invoking tick callbak. {}: {}".format(e.__class__.__name__, e))
# TICK
time.sleep(self._tick)
def _poll_due(self):
"""
Poll whatever is due
"""
#
# Get the variables that are due for polling and loop through them
# to poll.
#
to_exec = self._exec_map[(~pd.isnull(self._exec_map.gen)) &
(time.time() > self._exec_map.next_exec_t)].sort_values('next_exec_t')
# Try to get values for everything that is due
for k, item in to_exec.iterrows():
val = item.gen.next()
# No value means the poller hasnt returned yet so just leave it to try again at the next tick
if val is None:
continue
tstamp, exc, ret = val
dependents = self._exec_map.at[k, 'dependents']
if exc is not None:
if ret.val is not None:
raise Exception("Unexpected. It should not be possible to get an exception and return value at the same time. ret='{}', exc='{}'".format(ret, exc))
alertclass = self._exec_map.at[k, 'alert_exc']
ret = (alertclass("ERROR {}: {}".format(exc.__class__.__name__, exc)),) * (len(dependents) + 1)
else:
# Ensure that the result is always a tuple so that iteration works properly
if len(dependents) == 0:
ret= (ret,)
# populate dependents
for k1, dep in enumerate((item['name'],) + dependents):
# Look up the index of the dependent
dep_k = self._exec_map[self._exec_map['name'] == dep].index
assert len(dep_k) == 1
dep_k = dep_k[0]
self._log_value(k, ret[k1].val) #<-- TODO: Could be implmented using the generalised callback functionality we have
self._log_alert(k, ret[k1].alertcode) #<-- TODO: Could be implmented using the generalised callback functionality we have
# Update the dependent
self._exec_map.at[dep_k, 'value'] = ret[k1].val
self._exec_map.at[dep_k, 'alert'] = ret[k1].alertcode
self._exec_map.at[dep_k, 'exec_t'] = tstamp
self._exec_map.at[dep_k, 'next_exec_t'] = None if self._exec_map.at[k, 'dt'] is None else tstamp + self._exec_map.at[k, 'dt']
if not ret[k1].alertcode in self._alertstr_map.keys():
self._alertstr_map[ret[k1].alertcode] = ret[k1].alertstr if ret[k1].alertstr is not None else ''
def _log_value(self, k, val):
#
# Log a change in value if requested and as requested
#
item = self._exec_map.loc[k]
if (item['value'] != val) and (item['loglvl_values'] is not None):
log.log(int(item['loglvl_values']), 'Monitor: value "{}" --> {}'.format(item['name'], val))
def _log_alert(self, k, alertcode):
#
# Log a change in alert if requested and as requested
#
item = self._exec_map.loc[k]
if ((item['alert'] != alertcode) and
(item['loglvl_alerts'] is not None) and
(item['logthr_alerts'] <= alertcode or item['alert'] >= item['logthr_alerts'] )):
log.log(int(item['loglvl_alerts']), 'Monitor: alert "{}" --> {}'.format(item['name'], alertcode))
def __getitem__(self, item):
"""
Note: Returns a copy of the entry so changing variables will not change it in the monitor
Args:
item:
Returns:
"""
if item in self._parents.keys():
it = MonitorItem({'name':item, 'parent': self._parents[item]['parent']})
else:
it = self._exec_map[self._exec_map['name'] == item]
if len(it) > 1:
raise Exception("Monitor name is not unique. This should not be possible")
if len(it) < 1:
raise Exception("Monitor name does not exist")
itd = it.iloc[0].to_dict()
itd['age'] = time.time() - itd['exec_t'] if itd['exec_t'] else None
it = MonitorItem(itd)
return it #<-- TODO: Decide if we should return all fields or a subset
def __iter__(self):
for k,r in self._exec_map.iterrows():
yield r['name'], r['value'], r['alert'], time.time() - r['exec_t']
def __str__(self):
s = ""
s += " Monitor "
if self.alertcode > 0:
s += "({}) ".format(self._get_alertstr(self.alertcode))
s += "--- running\n" if hasattr(self, '_pthr') else '--- not running\n'
s += " {:<30s}{:<40s}{:<10s}{:<15s}\n".format("Name", "Value", "Alert", "Last polled")
s += " "+ "-" * 29 + " " + "-" * 39 + " " + "-" * 9 + " " + "-" * 15 + "\n"
for item in self.itertree():
parent = item['parent']
level = 0
while parent is not None:
parent = self[parent]['parent']
level += 1
t0 = time.time()
s += " {:30.29s}{:40.39s}{:10.9s}{:s}\n".format(
'.' * (level + 1) + item['name'],
'' if item['value'] is None else str(item['value']),
'' if item['alert'] is None else self._get_alertstr(item['alert']),
'{:.2f}'.format(item['age']) if item['age'] else '')
return s
def __repr__(self):
return self.__str__()
[docs] def itertree(self, parent=None):
if parent is None:
top = self._exec_map['name'][pd.isnull(self._exec_map.parent)].tolist()
top += [k for k, v in self._parents.items() if v['parent'] is None]
else:
top = self._exec_map['name'][self._exec_map.parent == parent].tolist()
top += [k for k, v in self._parents.items() if v['parent'] == parent]
if len(top) == 0:
raise StopIteration
top.sort()
for k in top:
yield(self[k])
for it in self.itertree(parent=self[k]['name']):
yield it
[docs] def keys(self):
"""
Just include this since dicts etc all use keys to describe the access keys. Although in this class
names is more descriptive. This method is the same as .names()
Returns:
"""
return list(self._exec_map['name'])
[docs] def names(self):
return self.keys()
if __name__ == '__main__':
from utils import setup_logger
import random
setup_logger(log, cons_loglvl=logging.DEBUG)
from database import MonitorDb
db = MonitorDb(db='sqlite:///mondbtest.db')
mon = Monitor(workers=2,default_dt=2)
#
# @mon.monitor(parent="test", dt=5, dtdb=5)
# # @alert_a
# def blah1():
# time.sleep(2)
# return 'a'
#
# @mon.monitor("b", dt=2, dtdb=5)
# def blah2():
# #time.sleep(2)
# return 'b'
#
#
# @mon.monitor("c", dt=10)
# def blah3():
# time.sleep(2)
# return RedAlert('c')
#
#
# @mon.monitor(dt=30)
# # @alert_a
# def fail():
# time.sleep(1)
# 1/0
# return("done")
#
#
# @mon.monitor(("test1","test2", "test3", "test4", "test5"), parent='test_many_ret')
# def test():
# time.sleep(2)
# return 1,CriticalAlert(2),GreenAlert(3),RedAlert(4),5
#
from datetime import datetime
import psutil
#from libgs.monitoring import Monitor
#mon = Monitor(default_dt=5)
count = dict(a=1,b=1,c=1,d=1,e=1,f=1,current_time=1,cpu_usage=1)
@mon.monitor(point=("a","b","c","d","e","f"))
def randmon():
#time.sleep(1)
for k,v in count.items():
count[k] += 1
return random.random(),random.random(),random.random(),random.random(),random.random(),random.random()
@mon.monitor(parent='grandchild')
def current_time(x=2):
count['current_time'] +=1
return datetime.utcnow()
@mon.monitor(parent='parent')
def cpu_usage():
count['cpu_usage'] +=1
usage = psutil.cpu_percent(interval=1)
if usage > 50:
return RedAlert(usage)
else:
return GreenAlert(usage)
# last_t = {}
# dt = 10
# @mon.callback()
# def save( name, tstamp, exc, res):
# if name in last_t.keys() and time.time() < last_t[name] + dt:
# # print('NOT saving {}'.format(name))
# return
#
# last_t[name] = time.time()
# # print('saving {}'.format(name))
# #db.put(0, args[0], args[3].val)
#mon.register_parent('parent')
# mon.register_parent('child', parent='parent')
# mon.register_parent('grandchild', parent='child')
mon.start()