基于Kafka搭建数据分发中心

发布 : 2019-11-30 分类 : works 浏览 : --

正文的撰写竟然整整是两个月之后 =.=

适用场景

数据分发有很多成熟的方式,尤其是mysql的数据分发,分发方案更是各种各样,但这里提到的数据分发主要针对满足如下条件的分发场景:

  1. 幂等数据分发
  2. 结构化数据分发
  3. 表级别同步(mysql的表,redis的db,es的索引等亚实例级别)
  4. 易于搭建/重建子中心
  5. 一致的数据同步平台(即可同步mysql、redis、es等到对应数据仓储甚至相互混合)

实现思想

消费者通过Kafka获取可追溯历史的结构化数据

实现要点

通信协议

即在Kafka信道中数据传输的格式,有两种选择,二进制与文本格式,两种格式在使用上各有优势

二进制格式主要优势在于数据压缩和专有协议,即同样的数据进行分发,二进制格式传输时间更短,并且专有协议解析相较于文本格式更快速。不过缺点就是,在项目前期数据分发格式尚未稳定,修改二进制格式,就意味着生产者与消费者都需要进行调整,所以在整个数据中心搭建稳定之前,推荐使用文本协议,个人采用了 json 作为协议格式,主要目的是为了数据结构直观,并且多语言支持较完善

协议的内容需要根据不同的数据类型来确定,不过会有几个基本元素需要包含在协议中:

  1. 存储位置
  2. 存储动作
  3. 数据内容(实体)

针对不同的数据仓储,上述协议内容是不一样的。比如redis,需要通过db与key确定数据要存储的位置,存储动作包括 sethsetdelete 等,数据实体主要指对应的字符串、哈希,甚至包括过期时间等,但是不需要schema;对es而言,索引(index)可确定存储位置,存储动作一般只有创建(覆盖)数据、删除数据以及创建索引,数据实体一般为json;对mysql而言,存储位置通过db与table确定,存储动作主要是增、改、删,不过在数据实体中会包含比较复杂的匹配操作,这也让mysql的数据分发相较而言比较复杂。

针对mysql,数据分发协议可以简单可以复杂。可以完全仿照主从分发协议,分发一条一条的语句,这也让数据中心可以简单的变成解析bin-log,或者构造简单的sql语句即可。但是简单的处理方式在未来也许会导致复杂的结果,如果是解析bin-log,那么就需要适当的区分不同的db、table,并且,存在bin-log,意味着肯定有一台mysql服务器的存在,这台服务器一旦出现问题,接下来的问题就复杂了,降低了可用性,之后的故障转移和数据一致性都会成为问题。另一种简单方式是直接根据数据来源,生成对应的sql语句,跳过bin-log这一步,就没有之前的问题了,但加入了注入漏洞的风险,不过也因为有了这个思路,产生了下面的一种稍微复杂一点的处理方式。

即,将sql语句的模式与数据区分开来,在消费者一端将两者合并,当然,这种合并依赖 pymysql 的sql执行方式

1
cursor.execute("delete from db.table where name like %s and age > %s", ("Jone S", 10))

相当于

1
cursor.execute("delete from db.table where name like 'Jone S' and age > 10")

这也奠定了模式与数据分离的基础,为此,这里实现了一种简单的单表写入数据(增改删)的ORM框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# 将结构化数据转换为sql语句
from functools import reduce

OPERATOR_JOIN = {"and", "or"}
OPERATOR_SINGLE = {
"gt": lambda v: "> %s",
"gte": lambda v: ">= %s",
"lt": lambda v: "< %s",
"lte": lambda v: "<= %s",
"eq": lambda v: "= %s",

"like": lambda v: "like %v",
}
OPERATOR_RANGE = {
"in": lambda v: "in ({})".format(", ".join(("%s",) * len(v))),
"between": lambda v: "between %s and %s"
}

ACTION_INSERT = "insert"
ACTION_REPLACE = "replace"


def parse(tree):
"""
parse where condition dict
:param tree: dict
eg:
{
"and": {
"eq": {
"A": 1
},
"gt": {
"B": 2
}
}
}
direct translate for pymysql =>
where A = %s and B > %s
(1, 2)
which means =>
where A = 1 and B > 2
:return:
"""
templates = []
for k, v in tree.items():
if k in OPERATOR_JOIN:
tmp = parse(v)
templates.append(("({})".format(" {} ".format(k).join((t[0] for t in tmp))),
reduce(lambda x, y: x + y, [t[1] for t in tmp])))
else:
pair = v.items()
if k in OPERATOR_SINGLE:
templates.extend([("{} {}".format(p[0], OPERATOR_SINGLE[k](p[1])), [p[1]]) for p in pair])
elif k in OPERATOR_RANGE:
templates.extend([("{} {}".format(p[0], OPERATOR_RANGE[k](p[1])), p[1]) for p in pair])
else:
raise Exception("unexpected operator {}".format(k))
return templates


class Mysql(object):
def __init__(self, db, table):
self.table = "{}.{}".format(db, table)

@staticmethod
def _parse(tree):
p = parse(tree)
where, value = p[0] if p else ("", [])
return ("where " + where.strip("()")) if where else "", value

@staticmethod
def _columns(columns):
return ",".join("`{}`".format(k) for k in columns)

@staticmethod
def _values_group(values_group):
if not values_group:
raise Exception("No value")
size = len(values_group)
value_size = len(values_group[0])
return ",".join(["({})".format(",".join(("%s",) * value_size))] * size), \
reduce(lambda x, y: x + y, values_group)

def insert(self, columns, values, action=ACTION_INSERT):
"""
mysql insert one record
:param columns: ["col_A", "col_B", "col_C"]
:param values: [value_A, value_B, value_C]
:param action: default to be insert
:return:
"""
return self.batch_insert(columns, [values], action)

def replace(self, columns, values):
"""
mysql replace one record, similar to insert
:param columns:
:param values:
:return:
"""
return self.insert(columns, values, ACTION_REPLACE)

def batch_replace(self, columns, values_group):
"""
mysql batch replace multiple record
:param columns: ["col_A", "col_B", "col_C"]
:param values_group: [[value_A0, value_B0, value_C0], [value_A1, value_B1, value_C1]]
:return:
"""
return self.batch_insert(columns, values_group, ACTION_REPLACE)

def batch_insert(self, columns, values_group, action=ACTION_INSERT):
"""
mysql batch insert multiple record
:param columns: ["col_A", "col_B", "col_C"]
:param values_group: [[value_A0, value_B0, value_C0], [value_A1, value_B1, value_C1]]
:param action: default to by insert
:return:
"""
values_template, values = self._values_group(values_group)
return "{} into {} ({}) values {}".format(action, self.table, self._columns(columns), values_template),\
values

def update(self, condition, sets):
"""
update
:param condition: sql where clause
eg:
{
"and": {
"eq": {
"A": 2
}
}
}
:param sets: value set
eg:
{
"B": 4
}
:return:
"""
where, values = self._parse(condition)
pair = sets.items()
keys = ", ".join("`{}` = %s".format(i[0]) for i in pair)
return "update {} set {} {}".format(self.table, keys, where), \
[i[1] for i in pair] + values

def delete(self, condition):
"""
delete
:param condition: sql where clause
eg:
{
"and": {
"eq": {
"A": 2
}
}
}
:return:
"""
where, values = self._parse(condition)
return "delete from {} {}".format(self.table, where), values


if __name__ == '__main__':
# examples
mysql = Mysql("user", "hobby")
print(mysql.insert(["type", "name", "lasting_year", "real_hobby"], ["normal", "joking", 4, 0]))
print(mysql.replace(["type", "name", "lasting_year", "real_hobby"], ["normal", "joking", 4, 1]))
print(mysql.delete({
"and": {
"eq": {
"name": "not exist"
}
}
}))
print(mysql.update({}, {
"real_hobby": 1
}))

根据使用方式,上述ORM完全可以转换为其他类型的数据,将数据存储到es,甚至redis,这也是数据与模式分离的好处之一

生产者(数据分发中心)

针对有模式的数据,生产者的作用主要有两个

  1. 分发数据
  2. 维持模式

当然,无模式数据,如redis中的数据,并不需要维持模式

如果生产者需要维持模式,则需要寻找一个地方来存储模式,可以是文件,也可以是其他数据仓储,如mysql、redis等。

实际使用过程中,采用了mysql作为模式存储仓储,分为存储中间表以及元信息表

中间表存储了数据转换之后的数据,即真正需要分发的数据。因为实际生产过程中,原始数据如果直接进行分发,并且消费者需要根据整表数据进行数据聚合,即进行大数据分析,即使一条数据发生变更,都会导致大量的计算产生,消费者将不堪重负。而且存储一些必要的元信息,作为生产者进行数据聚合的依据,可以适当减少生产者的计算负荷,顺便也让各个消费者能获得统一的聚合数据

元信息表存储的信息如上一次分发到第几条数据,上一次分发时间戳,以及重要的数据模式(表结构或索引结构)

这也要求生产者本身需要保证数据的高可用,只是没有数据分发这个过程要求的那么高。

有一点比较复杂的地方是,需要确定自己是否需要拥有随时创建新的数据仓储的需求,如果有,那么实际情况就会相对变得比较复杂。

虽然Kafka中的消息理论上可以永久存储,但生产者生产的数据只会越来越多,占用的硬盘空间只会有增无减,当然,如果硬盘不是问题,并且不想把问题复杂化,可以到此为止。

为了让新的数据仓储能随时创建,意味着在Kafka信道中需要随时都有整个数据仓储的全部数据,但Kafka的消息会定时清除,所以需要根据过期时间,定期将中间表中已经发布的数据以及可能有的模式再一次发布到信道中,但是一旦发布到Kafka,所有的消费者都会收到消息,所以肯定需要额外的标志位来表明此次数据发布只为特殊情况使用,即使接收到了消息,也应该忽略消息。而新的数据仓储在建立时,则可以优先检查是否有这种类型的数据,优先将这些数据写入到本地存储中,再消费之后的数据。当然,这对数据更新不频繁或者存在数据固定时期的数据来说是没有问题的,如果是高频数据,这个操作可能会造成数据丢失或重复写入数据

消费者(数据订阅者)

简单的消费者当然只需要静静的解析Kafka信道中的数据然后执行即可,但其实可以做的更好。由于数据来源于Kafka,并且消费者的解析都是可控的,这就让整个数据从接收到写入仓库都受控,于是便可以以此建立数据监控系统,数据发布时延,消费成功与否,都可以建立相关的监控平台,让整个数据分发过程更加安心

本文作者 : hellflame
原文链接 : https://hellflame.github.io/2019/11/30/data-center-by-kafka/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
留下足迹
点击通过issue留言