Source code for redis_timeseries

# -*- coding: utf-8 -*-

__author__ = 'Ryan Anguiano'
__email__ = 'ryan.anguiano@gmail.com'
__version__ = '0.1.8'


import calendar
import functools
import operator
from collections import OrderedDict
from datetime import datetime

try:
    import pytz
except ImportError:  # pragma: no cover
    pytz = None


__all__ = ['TimeSeries', 'seconds', 'minutes', 'hours', 'days']


seconds = lambda i: i
minutes = lambda i: i * seconds(60)
hours = lambda i: i * minutes(60)
days = lambda i: i * hours(24)


[docs]class TimeSeries(object): granularities = OrderedDict([ ('1minute', {'duration': minutes(1), 'ttl': hours(1)}), ('5minute', {'duration': minutes(5), 'ttl': hours(6)}), ('10minute', {'duration': minutes(10), 'ttl': hours(12)}), ('1hour', {'duration': hours(1), 'ttl': days(7)}), ('1day', {'duration': days(1), 'ttl': days(31)}), ]) def __init__(self, client, base_key='stats', use_float=False, timezone=None, granularities=None): self.client = client self.base_key = base_key self.use_float = use_float self.timezone = timezone self.granularities = granularities or self.granularities self.chain = self.client.pipeline()
[docs] def get_key(self, key, timestamp, granularity): ttl = self.granularities[granularity]['ttl'] timestamp_key = round_time(timestamp, ttl) # No timezone offset in the key return ':'.join([self.base_key, granularity, str(timestamp_key), str(key)])
[docs] def increase(self, key, amount, timestamp=None, execute=True): pipe = self.client.pipeline() if execute else self.chain for granularity, props in self.granularities.items(): hkey = self.get_key(key, timestamp, granularity) bucket = self._round_time(timestamp, props['duration']) self._hincrby(pipe, hkey, bucket, amount) pipe.expire(hkey, props['ttl']) if execute: pipe.execute()
[docs] def decrease(self, key, amount, timestamp=None, execute=True): self.increase(key, -1 * amount, timestamp, execute)
[docs] def execute(self): results = self.chain.execute() self.chain = self.client.pipeline() return results
[docs] def get_buckets(self, key, granularity, count, timestamp=None): props = self.granularities[granularity] if count > (props['ttl'] / props['duration']): raise ValueError('Count exceeds granularity limit') pipe = self.client.pipeline() buckets = [] bucket = self._round_time(timestamp, props['duration']) - (count * props['duration']) for _ in range(count): bucket += props['duration'] buckets.append(unix_to_dt(bucket)) pipe.hget(self.get_key(key, bucket, granularity), bucket) results = map(self._parse_result, pipe.execute()) return list(zip(buckets, results))
[docs] def get_total(self, *args, **kwargs): return sum([amount for bucket, amount in self.get_buckets(*args, **kwargs)])
[docs] def scan_keys(self, granularity, count, search='*', timestamp=None): props = self.granularities[granularity] if count > (props['ttl'] / props['duration']): raise ValueError('Count exceeds granularity limit') hkeys = set() prefixes = set() bucket = self._round_time(timestamp, props['duration']) - (count * props['duration']) for _ in range(count): bucket += props['duration'] hkeys.add(self.get_key(search, bucket, granularity)) prefixes.add(self.get_key('', bucket, granularity)) pipe = self.client.pipeline() for key in hkeys: pipe.keys(key) results = functools.reduce(operator.add, pipe.execute()) parsed = set() for result in results: result = result.decode('utf-8') for prefix in prefixes: result = result.replace(prefix, '') parsed.add(result) return sorted(parsed)
def _hincrby(self, pipe, hkey, bucket, amount): if self.use_float: pipe.hincrbyfloat(hkey, bucket, amount) else: pipe.hincrby(hkey, bucket, amount) def _parse_result(self, val): if self.use_float: return float(val or 0) else: return int(val or 0) def _round_time(self, dt, precision): rounded = round_time(dt, precision) # If precision is a multiple of 1 day, add timezone offset if self.timezone and precision % days(1) == 0: rounded_dt = unix_to_dt(rounded).replace(tzinfo=None) offset = self.timezone.utcoffset(rounded_dt).total_seconds() rounded = int(rounded - offset) dt = unix_to_dt(dt or tz_now()) dt_seconds = (hours(dt.hour) + minutes(dt.minute) + seconds(dt.second)) if offset < 0 and dt_seconds < abs(offset): rounded -= precision elif offset > 0 and dt_seconds >= days(1) - offset: rounded += precision return rounded # Deprecated
[docs] def record_hit(self, key, timestamp=None, count=1, execute=True): self.increase(key, count, timestamp, execute)
[docs] def remove_hit(self, key, timestamp=None, count=1, execute=True): self.decrease(key, count, timestamp, execute)
get_hits = get_buckets get_total_hits = get_total
def round_time(dt, precision): seconds = dt_to_unix(dt or tz_now()) return int((seconds // precision) * precision) def tz_now(): if pytz: return datetime.utcnow().replace(tzinfo=pytz.utc) else: return datetime.now() def dt_to_unix(dt): if isinstance(dt, datetime): dt = calendar.timegm(dt.utctimetuple()) return dt def unix_to_dt(dt): if isinstance(dt, (int, float)): utc = pytz.utc if pytz else None dt = datetime.fromtimestamp(dt, utc) return dt