入职新公司也有一段时间了,最近刚好主导了媒资库索引的构建,场景也主要集中在用户维度的查询,例如:用户个人主页媒资列表,用户在这些场景,一定程度上更希望有某一维度序列(时间/权重等)的溯源。
不同于个性化推荐的场景,用户在这里看到的内容需要有一定的顺序性,也就涉及到分页方案的选型,现在就以微博个人主页这个场景作为示例,咱们看看都有哪些比较有意思的方案。
数据库设计
这里以mysql为例,为了保持微博内容库的高效率,我们划分出单独的索引库来维护索引数据(用户ID与微博ID关系),其中索引库冗余一份内容状态,表结构如下
create table `weibo_index`(
uid BIGINT NOT NULL DEFAULT 0 COMMENT '用户ID',
mid VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微博ID',
publish_time INT NOT NULL DEFAULT 0 COMMENT '发布时间 unix时间戳',
status tinyint NOT NULL DEFAULT 1 COMMENT '状态 1-正常 2-删除',
PRIMARY KEY(uid, mid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='索引表';
基于page分页
基于页数(page、count)分页可以说是最常见的分页方案,实现也非常简单,对应的SQL查询
SELECT mid FROM `weibo_index` WHERE uid = 1001 AND status = 1 ORDER BY publish_time DESC LIMIT {(page-1)*count}, {count}
为了保持查询的高效,我们为数据库追加一个索引
alter table `weibo_index` add index `idx_timeline`(uid, status, publish_time);
翻页查询过程也非常简单
我们每次根据不同的page,count计算出对应的偏移量,这种方案其实存在一定的弊端,那就是在用户翻页的过程中,列表数据很可能会发生更新,从而造成下一页出现“重复”微博的现象。
除此之外,在偏移量不断增大的情况下,即使存在“覆盖索引”,查询的效率也在逐步降低,因为本质上IO是在逐渐增大。
基于单cursor分页
分页方案的弊端,其实主要源自于偏移量的起始值是计算得来的,而不是唯一指定的,所以我们可以通过指定起始偏移量,也就是cursor的方式来避免上述的问题,对应的SQL查询
SELECT mid,publish_time FROM `weibo_index` WHERE uid = 1001 AND status = 1 AND publish_time < {last_time} ORDER BY publish_time DESC LIMIT {count}
我们将最后一条mid对应的publish_time作为游标传递给下游,这样很好的避免了page分页的问题,同时因为索引查找的过程已经锁定了需要的行数据,查询效率也可以一直保持高效。
但是这种方案,又引来了新的问题,那就是假如我们的游标值也就是发布时间不唯一,例如:允许用户定时发布一些内容,如果程序不加特殊处理,那么就会存在同一uid,有多条微博的publish_time是一样的情况。
从上图可以看到,这种情况下,在翻页的过程中,会有数据“丢失”的现象出现。
既然发布时间不唯一,那我们是否可以以一个唯一的值来替代它,例如weibo_id,不过weibo_id的生成并不一定随时间有序,所以,综合一下我们可以使用publish_time和mid组合cursor模式来分页
基于多cursor分页
多cursor分页模式,查询语句也要做些调整
SELECT mid,publish_time FROM `weibo_index` WHERE uid = 1001 AND status = 1 AND publish_time <= {last_time} ORDER BY publish_time DESC LIMIT {count+prefetchCount}
查询过程
我们通过冗余查询( publish_time <= {last_time})的方式,并且通过{last_mid}来定位要进行切割的位置,保证数据不会丢失,同时也需要多查一部分数据(prefetchCount),保证能够返回预期count的数据
这里prefetchCount取多少合适,我们可以统计现有的用户数据,或者从产品侧限制了用户定时同一时间发布内容的数量,来确定这个值,但是取值是否需要及时感知业务变化,还有查询性能,这些后期都需要我们重点关注。
基于当前时间发号ID分页
仔细分析一下无论是单cursor还是多cursor,根本问题在于每条索引缺少一个随时间递增且唯一的id,为了解决这个问题,我们为数据库增加一个自增id字段
alter table `weibo_index` add column `publish_time_id` bigint not null auto_increment;
查询语句也对应做下调整
SELECT mid,publish_time_id FROM `weibo_index` WHERE uid = 1001 AND status = 1 AND publish_time_id < {last_id} ORDER BY publish_time_id DESC LIMIT {count}
查询过程和“单cursor模式”类似,只不过游标由publish_time
调整为了publish_time_id
随着索引库的不断增大,访问不断增加,基于各种维度(查询IO/存储/CPU)的考量,我们需要分库分表,这样就需要一个发号器来生成全局的自增ID
发号器的选型也有很多成熟的方案,例如:mysql/snowflake等等,这里不再赘述。
这种方案其实并不是完美的,因为它实施需要有一个前提,那就是用户的发布时间和生成发号的时间,也就是入索引库的时间是一致的,这样发号的排序和用户的微博排序才是一致的,真实场景下其实是存在两者不一致的情况,例如:我们的数据源是从多处上游数据汇总过来的,当一路数据消费延迟或者出现故障,就会造成排序错乱。
再比如现在用户希望除了有时间序的内容,还希望微博能够按每篇的热度(泛指)排序,这和数据的入库时间更加没有关系。
基于索引顺序发号ID分页
回归问题的本源,我们想要的是在保证发号唯一的前提下,并且能保证发号值随对应索引值的单调性。所以我们可以选择基于本身期望的索引顺序,例如:时间序就是基于发布时间发号,热度序就是基于热度值发号,来完美解决这个问题。 实现方案也比较简单,我们以常见的snowflake发号器为例,发号器由64字节组成,包含41字节毫秒时间戳 + 10字节机器ID + 12位毫秒内自增ID,在此基础上做些改进,使用高位字节来表示预定义值,为了便于描述ID信息,使用14字节数组来存储发号值,并最终转化为长度为28的16进制字符串。
生成发号ID:
func (n *SnowFlake) Generate(pre int64) ID {
n.mu.Lock()
n.num = [numBytes]byte{} // reset array
now := time.Since(n.epoch).Nanoseconds() / 1000000
if now == n.time {
n.step = (n.step + 1) & n.stepMax
if n.step == 0 {
for now <= n.time {
now = time.Since(n.epoch).Nanoseconds() / 1000000
}
}
} else {
n.step = 0
}
n.time = now
n.pre = pre
// bigEndian int64 to custom size bytes
n.toBytes(n.preShift, PreBytes, n.pre)
n.toBytes(n.timeShift, TimeBytes, n.time)
n.toBytes(n.nodeShift, NodeBytes, n.node)
n.toBytes(n.stepShift, StepBytes, n.step)
// to hex string, id is the result variable, use bytes pool for 0 allocs/op
id := pool.Get().(*[]byte)
hex.Encode(*id, n.num[:])
pool.Put(id)
n.mu.Unlock()
return ID{id: *(*string)(unsafe.Pointer(id))} // nocopy
}
func (n *SnowFlake) toBytes(index, length int, v int64) {
for i := 0; i < length; i++ {
n.num[index+i] = byte(v >> ((length - i - 1) * 8))
}
}
有些美中不足的是,发号ID由于是28位字符串,相比于长整形的8字节,索引存储上有一定增加,在数据量较大的情况下,索引效率会有一定降级,不过随着数据量的逐步增大,我们的技术架构一定也是在逐步迭代升级,例如:适时的分库分表,缓存等;另外,除了生成发号,一些场景下我们也需要知道原值,因此需要能够依据发号ID反演原值:
func (n *SnowFlake) ParseString(id string) (int64, error) {
num := make([]byte, numBytes) // num is tmp variable, 0 heap allocs/op, unnecessary use bytes pool
if _, err := hex.Decode(num, []byte(id)); err != nil {
return 0, err
}
return n.toInt64(num, n.preShift, PreBytes), nil
}
func (n *SnowFlake) toInt64(num []byte, index, length int) int64 {
var v int64
for i := 0; i < length; i++ {
v |= int64(num[index+i]) << ((length - i - 1) * 8)
}
return v
}
完整代码参考pre发号器