import logging
import itertools
import collections
import redis
import typing
from datetime import timedelta
from django.contrib import admin
from django.db.models import Q
from django.utils import timezone
from . import models
from . import client
logger = logging.getLogger(__name__)
[docs]def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
[docs]class Query:
order_by = tuple()
def __init__(self, queryset):
self.queryset = queryset
[docs]class Queryset:
def __init__(self, model: models.RedisValue, slice_limit=101):
self.slice_limit = slice_limit
self.q: str = '*'
self.filters: typing.List[Q] = ()
self.admin = admin
self.model = model
self._meta = model._meta
self.master = client.get_master(model._meta.model_name)
self.slave = client.get_slave(model._meta.model_name)
self._cache = None
self._get_cache = None
self.slice = None
self.ordered = True
self.query = Query(self)
[docs] def count(self):
return len(self)
[docs] def order_by(self, *args, **kwargs):
return self
[docs] def filter(self, *filters, **raw_filters):
self._cache = dict()
self._get_cache = None
self.filters = list(filters)
if raw_filters:
self.filters.append(Q(**raw_filters))
query = None
for filter in self.filters:
path, args, kwargs = filter.deconstruct()
error = ('%r is not supported yet, please file a bug report on '
'https://github.com/WoLpH/redis_admin/issues/') % filter
# Not sure when args are ever a thing so we don't support it yet
# commenting the following line as it causes an issue on Django 3.2.12
# assert not args, error
for key, value in args:
# Can't have multiple filters with redis
assert not query, error
key = key.split('__', 1)
if key[1:]:
key, query = key
else:
key, = key
query = None
# Can't search for anything besides key with redis
assert key == 'key', error
if query == 'exact' or query is None:
query = value
elif query == 'startswith':
query = '%s*' % value
elif query == 'endswith':
query = '*%s' % value
elif query == 'contains':
query = '*%s*' % value
else:
raise AssertionError(error)
self.q = query or '*'
return self
def __getattr__(self, key):
message = 'queryset.%s' % key
print(message)
raise AttributeError('Unknown attribute %s' % message)
def _clone(self):
return self
def __len__(self):
if self.filters:
# Arbitrary number, we don't want to search if not needed
if not self._cache:
self[:self.slice_limit]
return len(self._cache)
else:
keyspace = self.slave.info('keyspace')
db = self.slave.connection_pool.connection_kwargs.get('db', 0)
return keyspace.get(f'db{db}', dict()).get('keys', 1000)
[docs] def get(self, *args, **kwargs):
if not self._get_cache:
self._get_cache = next(iter(self.filter(**kwargs)))
return self._get_cache
def __iter__(self):
logger.info('searching %r with query %r',
self.model._meta.model_name, self.q)
if self._cache:
for value in self._cache.values():
yield value
return
self.slice = index = self.slice or slice(self.slice_limit)
slice_size = min(index.stop, self.slice_limit)
keys_iter = self.slave.scan_iter(self.q, count=slice_size)
keys = itertools.islice(keys_iter, index.start, index.stop, index.step)
keys = [key.decode() for key in keys]
now = timezone.now()
with self.slave.pipeline() as pipe:
for key in keys:
pipe.type(key)
pipe.pttl(key)
pipe.object('IDLETIME', key)
values = collections.OrderedDict()
for key, result in zip(keys, grouper(pipe.execute(), 3)):
type_, ttl, idle = result
type_ = type_.decode()
if ttl > 0:
expires_at = now + timedelta(seconds=ttl / 1000)
else:
expires_at = None
if idle > 0:
idle_since = now - timedelta(seconds=idle)
else:
idle_since = None
value = self.model.create(
key=key,
type=type_,
expires_at=expires_at,
idle_since=idle_since,
)
values[key] = value
with self.slave.pipeline() as pipe:
for key in keys:
value = values[key]
value.fetch_value(pipe)
try:
for key, value in zip(keys, pipe.execute()):
values[key].raw_value = value
try:
values[key].value
except Exception as e:
logger.exception('Unable to decode: %r, error: %r',
value, e)
except redis.ResponseError:
for key in keys:
value = values[key]
value.raw_value = value.fetch_value(self.slave)
self._cache = values
for value in values.values():
yield value
def __getitem__(self, index):
if isinstance(index, int):
self.index = slice(0, index, 1)
elif isinstance(index, slice):
self.index = index
else:
raise TypeError('Unsupported index type %r: %r' % (
type(index), index))
return self
[docs]class RedisAdmin(admin.ModelAdmin):
show_full_result_count = False
list_display = ['key', 'type', 'expires_at', 'ttl', 'idle',
'cropped_value', 'json', 'base64']
search_fields = 'key__contains',
# Keep everything read-only for now, saving isn't implemented yet
readonly_fields = [f.name for f in models.RedisValue._meta.get_fields()]
[docs] def get_queryset(self, request):
return Queryset(self.model, self.list_per_page + 1)
for server_model in models.server_models.values():
admin.site.register(server_model, RedisAdmin)