未加星标

关于用redis实现动态时间段内统计排序的想法

字体大小 | |
[数据库(综合) 所属分类 数据库(综合) | 发布者 店小二05 | 时间 2016 | 作者 红领巾 ] 0人收藏点击收藏
问题描述

需要根据某类数据在动态时间段内的统计值对这些数据进行排名。例如按过去24小时内点赞数排名的帖子,每隔一小时计算一次结果。以下描述均针对这个例子展开。

解决思路

针对这种问题,我的第一反应是直接通过 mysql 一张数据表记录所有数据的每一条统计值改变的行为,例如记下每个帖子在哪个时间点被谁点赞。排序结果直接通过 select + where + order_by + limit 。简单粗暴,但效率低下,扩展性差,而且当数据量很多时,会导致数据库查询效率低下。

那么为了提高效率, mysql 换成 redis 如何?将这些统计值改变的记录移到 redis 的 zset 中,为每个帖子建立一条 zset , zset 的每一条 member 代表一条点赞记录, field 为帖子id, score 记录点赞时间。利用 zcount 查询每个帖子有效时间段内的点赞总数。 redis 查询也快,看起来可行,但当帖子基数和点赞数很大时, zset 中成员量暴增,而且对于过期数据怎么处理?而且把点赞量换成点击量呢?就要记录每个用户对每篇帖子的点击行为了,这个方案是否合理?

仔细想一下这个问题,其实我们只需要关注帖子的点赞数量,而不是每条点赞行为。因此可以考虑利用 redis 记录帖子在每个时间点内的点赞数量,并且定期删掉过期的数据,例如记录每条帖子每个小时内的点赞数。这样就可以知道过去24小时内这个帖子的点赞总数了。具体的方案下面将展开。

数据结构

需要两个 redis 数据结构,分别存放 过程 和 结果 ,因为要定期过期一些数据,所以要通过 过程 记录数据统计值的历史记录,同时 结果 记录所有有效 过程 内的总体排序结果。

例如当前时间9:00,就需要知道当前9点到昨天9点的所有点赞数量,也就不需要昨天8点的点赞数了,所以结果里面需要加上今天9点的数据,减去昨天8点的数据。而随着时间往后推移,昨天9点到今天9点的数据也会依次被减掉,所以需要记录有效时间段内每个时间点的数据,这就是过程的作用了。结果的作用显而易见了,记录结果并支持排序。

过程利用 hash 结构存放在有效期内每个时间节点的所有帖子的统计值,例如建立: last_1, last_2, last_3 …… 等24个 key (因为是统计过去24小时内),每个 hash key 内记录该节点到下个节点的时间段内被点赞的帖子( field )及其点赞数( value ) 为什么采用 hash 结构而不是 list 或 set ,因为在定时更新统计数据过程中,需要先获取当前最新,再加一或减一, hash 的 hincrby 方法支持原子操作,可以在一个事务内完成这两个操作。试想在 list 结构下,对同一个帖子的多个并发点赞,可能导致数据错误。

结果利用 zset 结构存放所有帖子的统计总数,也就是排序结果。包括所有帖子( member )及其统计总数( value ) zset 支持记录 score 以及按 score 排序,也可以支持分页获取数据。

想法实现

基于以上过程和结果的redis结构,需要处理统计数据的更新,以及排序结果的定时更新两个关键流程。

以时间点为单位记录统计数据,当有更新时,只需要更新最近的时间点。而且要及时处理过期时间点。 关于过期时间点的清理时机的选择,想过两种方式,写一个定时器定时清理,或找准已有的请求时机触发清理。定时清理直观准确,如果项目中本来就有定时器组件,当然可以利用起来。我选择第二种方式,在统计数据更新时,如果要新增时间点了,就清理掉过期时间点。

排序结果只通过一个 zset 结构记录,注意及时清理 score 为0的数据。

from datetime import datetime, timedelta import redis TIME_SLOT = 'slot_{name}_{timestamp}' # hash,时间点,到前一个时间点的时间段内,所有数据的变化值 STATS_RESULTS = 'stats_{name}' # 统计结果,周期计算的统计值排序结果 LAST_SLOT = 'last_slot_{name}' # 记录最新的时间节点 redis_client = redis.Redis(host='127.0.0.1', port=6380) class DyStats(object): def __init__(self, stats_name, period, interval): """ :param stats_name: 被统计值名称, 必须唯一 :param period: 统计参数的有效期, 比如7天, 24小时 :param interval: 定时计算的周期, 如每隔一小时,每隔一天计算一次 """ assert interval in [1, 2, 3, 4, 6, 8, 12] or (interval >= 24 and interval % 24 == 0) assert period >= interval and period % interval == 0 self.stats_name = stats_name self.period = period # 以小时为单位 self.interval = interval # 以小时为单位 def incr_stats(self, target_id, amount=1): """ 统计数量加一 """ last_slot = self._last_slot redis_client.hincrby(TIME_SLOT.format(name=self.stats_name, timestamp=last_slot), target_id, amount) redis_client.zincrby(STATS_RESULTS.format(name=self.stats_name), target_id, amount) def get_stats_list(self, offset, limit, withscores=False): """ 获取排名结果 """ if withscores: return [(int(i), s) for i, s in redis_client.zrevrange(STATS_RESULTS.format(name=self.stats_name), offset, offset + limit, withscores)] else: return [int(i) for i in redis_client.zrevrange(STATS_RESULTS.format(name=self.stats_name), offset, offset + limit, withscores)] def remove_all_expired_slots(self): """ 删除所有的过期节点 """ slot_keys = redis_client.keys(TIME_SLOT[:11].format(name=self.stats_name) + '_*') now = datetime.now() for key in slot_keys: key = int(key.decode()[(6 + len(self.stats_name)):]) if key < (datetime(year=now.year, month=now.month, day=now.day, hour=now.hour) - timedelta(hours=self.period + self.interval)).timestamp(): self.remove_expired_slot(key) def remove_expired_slot(self, timestamp): """ 移除过期的时间节点 """ # 记录下要删除的节点中所有的统计值 slot_values = redis_client.hgetall(TIME_SLOT.format(name=self.stats_name, timestamp=timestamp)) # 删除过期节点 deleted = redis_client.delete(TIME_SLOT.format(name=self.stats_name, timestamp=timestamp)) # 减去统计结果中的过期值 if deleted: for key in slot_values.keys(): value = redis_client.zincrby(STATS_RESULTS.format(name=self.stats_name), key.decode(), -int(slot_values[key].decode())) # 删除统计结果中score为0的成员,这里可能出现incr操作,导致value>0,可能造成统计数据不准 # zremrangebyscore貌似能解决问题,但需要遍历zset中所有member,代价太大 # 考虑到排序问题对数据严格准确性要求不高,可以容忍 if value <= 0: redis_client.zrem(STATS_RESULTS.format(name=self.stats_name), key.decode()) @property def _last_slot(self): """ 最新slot """ last_slot = redis_client.get(LAST_SLOT.format(name=self.stats_name)) last_slot = int(last_slot.decode()) if last_slot is not None else None if last_slot is None: last_slot = self._set_first_slot() # last_slot一过期就删除所有已过期的slot if datetime.fromtimestamp(last_slot) + timedelta(hours=self.interval) <= datetime.now(): self.remove_all_expired_slots() # 设置最新时间槽 while datetime.fromtimestamp(last_slot) + timedelta(hours=self.interval) <= datetime.now(): last_slot = (datetime.fromtimestamp(last_slot) + timedelta(hours=self.interval)).timestamp() redis_client.set(LAST_SLOT.format(name=self.stats_name), int(last_slot)) return int(last_slot) def _set_first_slot(self): """ 设置初始slot 的timestamp """ now = datetime.now() first_slot = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour).timestamp() redis_client.set(LAST_SLOT.format(name=self.stats_name), int(first_slot)) return int(first_slot) 总结

因为对redis还不是特别熟悉,最近也在看《redis实战》,感觉还没有完全利用好redis特性,本文的解决方法后续也许还有很多可以优化的点。 例如 remove_expired_slot() 方法中,删除节点时,如何处理并发情况下统计数据一致性的问题,是否可以通过 zremrangebyscore 或者事务解决?能否利用其它更简洁结构处理这类问题? 算是自己在redis学习过程中一篇小小的记录!

本文数据库(综合)相关术语:系统安全软件

主题: 数据数据结构RedisSUTI删除其实数据库
分页:12
转载请注明
本文标题:关于用redis实现动态时间段内统计排序的想法
本站链接:http://www.codesec.net/view/482756.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 数据库(综合) | 评论(0) | 阅读(51)