文章目录
- 如何解决高并发、限流、数据同步问题
- 1、如何解决高并发
- 2、OpenResty
- 2.1、安装openresty
- 3、广告缓存的载入与读取
- 3.1、Lua+nginx配置
- 3.2、Lua+nginx配置(从redis中获取数据)
- 3.3、添加openresty缓存
- 4、限流配置
- 4.1、控制速率
- 4.2、控制并发量(连接数)
- 5、数据同步的问题
- 5.1、配置master Mysql(开启binlog模式)
- 5.2、canal的安装
- 5.3、canal微服务的搭建
- 5.4、逻辑分析
- 5.5、代码编写
如何解决高并发、限流、数据同步问题
1、如何解决高并发
在开发一个项目的时候,首页门户系统需要展示各种各样的数据,如京东:
这些数据通常为变更频率低的数据,但是访问量却很高,我们可以利用多级缓存来解决这个问题,当然了也可以让网页做为静态页面,但是这样要是前端的数据需要进行变动,就需要将服务关闭,然后进行代码的修改,这样对用户的体验是极度不友好的。
按照我们的一般的思路,那么我们一般的服务结构是这么设计的:
但是由于首页的数据一般不会有太频繁的改动,所以对于用户的每一次请求都要去数据库进行查询访问是不太好的行为,所以我们可以利用缓存的方式。
设计思路:
1.首先访问nginx ,我们可以采用缓存的方式,先从nginx本地缓存中获取,获取到直接响应
2.如果没有获取到,再次访问redis,我们可以从redis中获取数据,如果有 则返回,并缓存到nginx中
3.如果没有获取到,再次访问mysql,我们从mysql中获取数据,再将数据存储到redis中,返回。
当然了,要是我们中间的逻辑使用java语言进行逻辑代码的编写,也是对速度来说有着一定的损失,因为当请求来到了之后,要经过servlet、spring、我们的controller层、service层、以及Dao层,最后还需要去操作数据库,这都是需要消耗时间的,这个时候我们可以利用LUA语言嵌入到程序中查询相关的业务。
至于什么是LUA语言,可以查看:Lua语言基础
我们在使用SpringBoot开发时,Tomcat服务器所支持的并发量通常为300-500
,而nginx的并发量正常情况下为2-3
万,但是OpenResty可以 快速构造出足以胜任 10K 以上并发连接响应的超高性能 Web 应用系统。
2、OpenResty
OpenResty(又称:ngx_openresty) 是一个基于 nginx的可伸缩的 Web 平台,由中国人章亦春发起,提供了很多高质量的第三方模块。
OpenResty 是一个强大的 Web 应用服务器,Web 开发人员可以使用 Lua 脚本语言调动 Nginx 支持的各种 C 以及 Lua 模块,更主要的是在性能方面,OpenResty可以 快速构造出足以胜任 10K 以上并发连接响应的超高性能 Web 应用系统。
360,UPYUN,阿里云,新浪,腾讯网,去哪儿网,酷狗音乐等都是 OpenResty 的深度用户。
OpenResty 简单理解成 就相当于封装了nginx,并且集成了LUA脚本,开发人员只需要简单的其提供了模块就可以实现相关的逻辑,而不再像之前,还需要在nginx中自己编写lua的脚本,再进行调用了。
2.1、安装openresty
linux安装openresty:
1.添加仓库执行命令
yum install yum-utils
yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo
2.执行安装
yum install openresty
3.安装成功后 会在默认的目录如下:
/usr/local/openresty
3、广告缓存的载入与读取
3.1、Lua+nginx配置
需求:利用lua+nginx配置,nginx为入口,用户访问后,将mysql中的数据通过lua脚本存入到redis数据库中
实现思路:
- 连接mysql,按照广告的分类id读取广告列表,装换为json字符串
- 连接redis,将广告列表json字符串存入redis中。
在一个目录下创建一个lua脚本文件,在我这里为/root/lua
下创建update_content,目的就是连接mysql,查询数据,并存储到redis中。
lua脚本代码如下:
代码:
ngx.header.content_type="application/json;charset=utf8" -- 将请求头中的数据转为json格式
local cjson = require("cjson") -- 引入cjson
local mysql = require("resty.mysql") -- 引入mysql
local redis = require("resty.redis")
local uri_args = ngx.req.get_uri_args() -- 获取uri的参数
local id = uri_args["id"] -- 获取请求中的id参数
local db = mysql:new() -- 创建MySQL连接
db:set_timeout(1000) -- 设置超市时间
local props = {
host = "ip_address",
port = 3306,
database = "database",
user = "root",
password = "123456"
}
local res = db:connect(props)
local select_sql = "select url,pic from table where status ='1' and category_id="..id.." order by sort_order"
res = db:query(select_sql) -- 执行sql语句
db:close()
local red = redis:new()
red:set_timeout(2000)
local ip ="ip_address"
local port = 6379
red:connect(ip,port)
red:set("content_"..id,cjson.encode(res)) -- 存入redis中
red:close()
ngx.say("{flag:true}")
此时,我们在nginx中的配置如下:
在需要的服务中添加头信息,和 location信息(也就是lua脚本的路径)
此时我们可以访问该路径,看看数据是否存入到了redis中,这里我们使用redis远程连接客户端进行查看,查看之前redis数据库中数据:
此时访问路径ipaddress/update_content/?id=1
此时访问后放回数据为:
此时数据库的数据为:
3.2、Lua+nginx配置(从redis中获取数据)
此时我们通过3.1步我们可以将数据从mysql中取出来然后放入redis中
这一步我们需要将redis中的数据取出来,然后返回给前端。
在/root/lua
目录下创建read_content.lua:
--设置响应头类型
ngx.header.content_type="application/json;charset=utf8"
--获取请求中的参数ID
local uri_args = ngx.req.get_uri_args();
local id = uri_args["id"];
--引入redis库
local redis = require("resty.redis");
--创建redis对象
local red = redis:new()
--设置超时时间
red:set_timeout(2000)
--连接
local ok, err = red:connect("ipaddr", 6379)
--获取key的值
local rescontent=red:get("content_"..id)
--输出到返回响应中
ngx.say(rescontent)
--关闭连接
red:close()
同样的,我们需要配置nginx中的服务配置:
location /read_content{
content_by_lua_file /root/lua/read_content.lua;
}
此时我们访问路径ipaddress/read_content?id=1
返回的数据为:
也就是我们刚刚的从mysql中读出来的数据。
3.3、添加openresty缓存
如上的方式没有问题,但是如果请求都到redis,redis压力也很大,所以我们一般采用多级缓存的方式来减少下游系统的服务压力。参考基本思路图的实现。
先查询openresty本地缓存(需要开启)
如果 没有
lua_shared_dict dis_cache 128m; #nginx服务外配置 定义lua缓存命名空间极其大小
再查询redis中的数据,如果没有
再查询mysql中的数据,但凡有数据 则返回即可。
修改read_content.lua文件,代码如下:
ngx.header.content_type="application/json;charset=utf8"
local uri_args = ngx.req.get_uri_args();
local id = uri_args["id"];
--获取本地缓存
local cache_ngx = ngx.shared.dis_cache;
--根据ID 获取本地缓存数据
local contentCache = cache_ngx:get('content_cache_'..id);
if contentCache == "" or contentCache == nil then -- 如果在本地缓存中没有找到数据
local redis = require("resty.redis");
local red = redis:new()
red:set_timeout(2000)
red:connect("ipaddr", 6379)
local rescontent=red:get("content_"..id);
if ngx.null == rescontent then -- redis中没有数据
local cjson = require("cjson");
local mysql = require("resty.mysql");
local db = mysql:new();
db:set_timeout(2000)
local props = {
host = "ipaddr",
port = 3306,
database = "database",
user = "username",
password = "password"
}
local res = db:connect(props);
local select_sql = "select url,pic from table where status ='1' and category_id="..id.." order by sort_order";
res = db:query(select_sql);
local responsejson = cjson.encode(res);
red:set("content_"..id,responsejson);
ngx.say(responsejson);
db:close()
else -- redis中有数据,存入本地缓存中,时长为10分钟
cache_ngx:set('content_cache_'..id, rescontent, 10*60);
ngx.say(rescontent)
end
red:close()
else -- 如果本地缓存中有数据,那么直接返回
ngx.say(contentCache)
end
这个时候我们来访问一下
ipaddress/update_content?id=1
此时redis中的数据:什么都没有
访问后:
ipaddress/read_content?id=1
访问完update_content
的连接后,接下来应该就是从redis中取出数据了,我们可以访问该链接:
此时我们可以将redis中的数据删除,那么他应该就是从openresty中的缓存中拿取数据:
再次进行访问:
还是能获取到数据:
4、限流配置
一般情况下,首页的并发量是比较大的,即使 有了多级缓存,当用户不停的刷新页面的时候,也是没有必要的,另外如果有恶意的请求 大量达到,也会对系统造成影响。
而限流就是保护措施之一。
nginx提供两种限流的方式:
-
一是控制速率
-
二是控制并发连接数
4.1、控制速率
控制速率的方式之一就是采用漏桶算法。
(1)漏桶算法实现控制速率限流
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:
(2)nginx的配置
配置示意图如下:
修改/usr/local/openresty/nginx/conf/nginx.conf:
user root root;
worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
#cache
lua_shared_dict dis_cache 128m;
#限流设置
limit_req_zone $binary_remote_addr zone=contentRateLimit:10m rate=2r/s;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
server {
listen 80;
server_name localhost;
location /update_content {
content_by_lua_file /root/lua/update_content.lua;
}
location /read_content {
#使用限流配置
limit_req zone=contentRateLimit;
content_by_lua_file /root/lua/read_content.lua;
}
}
}
配置说明:
binary_remote_addr 是一种key,表示基于 remote_addr(客户端IP) 来做限流,binary_ 的目的是压缩内存占用量。
zone:定义共享内存区来存储访问信息, contentRateLimit:10m 表示一个大小为10M,名字为contentRateLimit的内存区域。1M能存储16000 IP地址的访问信息,10M可以存储16W IP地址访问信息。
rate 用于设置最大访问速率,rate=10r/s 表示每秒最多处理10个请求。Nginx 实际上以毫秒为粒度来跟踪请求信息,因此 10r/s 实际上是限制:每100毫秒处理一个请求。这意味着,自上一个请求处理完后,若后续100毫秒内又有请求到达,将拒绝处理该请求.我们这里设置成2 方便测试。
测试:
我们可以限制给注释掉再看看。
这就很大程度上进行了限流
(3)处理突发流量
上面例子限制 2r/s,如果有时正常流量突然增大,超出的请求将被拒绝,无法处理突发流量,可以结合 burst 参数使用来解决该问题。
例如,如下配置表示:
上图代码如下:
server {
listen 80;
server_name localhost;
location /update_content {
content_by_lua_file /root/lua/update_content.lua;
}
location /read_content {
limit_req zone=contentRateLimit burst=4;
content_by_lua_file /root/lua/read_content.lua;
}
}
burst 译为突发、爆发,表示在超过设定的处理速率后能额外处理的请求数,当 rate=2r/s 时,将1s拆成2份,即每500ms可处理1个请求。
此处,**burst=4 **,若同时有4个请求到达,Nginx 会处理第一个请求,剩余3个请求将放入队列,然后每隔500ms从队列中获取一个请求进行处理。若请求数大于4,将拒绝处理多余的请求,直接返回503.
现象解释:
之所以会出现前六个中会有五个访问不成功,是因为我们设置了**burst=4 **,nginx处理了第一个请求之后,再将剩余3个请求将放入队列,其余的请求每隔500ms从队列中获取一个请求进行处理。
不过,单独使用 burst 参数并不实用。假设 burst=50 ,rate依然为10r/s,排队中的50个请求虽然每100ms会处理一个,但第50个请求却需要等待 50 * 100ms即 5s,这么长的处理时间自然难以接受,等待时间太久了。
因此,burst 往往结合 nodelay 一起使用。
例如:如下配置:
server {
listen 80;
server_name localhost;
location /update_content {
content_by_lua_file /root/lua/update_content.lua;
}
location /read_content {
limit_req zone=contentRateLimit burst=4 nodelay;
content_by_lua_file /root/lua/read_content.lua;
}
}
这下就是这个情况了,就不会出现延迟了:
4.2、控制并发量(连接数)
ngx_http_limit_conn_module 提供了限制连接数的能力。主要是利用limit_conn_zone和limit_conn两个指令。
利用连接数限制 某一个用户的ip连接的数量来控制流量。
注意:并非所有连接都被计算在内 只有当服务器正在处理请求并且已经读取了整个请求头时,才会计算有效连接。此处忽略测试。
配置语法:
Syntax: limit_conn zone number;
Default: —;
Context: http, server, location;
(1)配置限制固定连接数
配置如下:
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
# 设置nginx缓存 空间 为128M 缓存对象名称为dis_cache
lua_shared_dict dis_cache 128m;
limit_conn_zone $binary_remote_addr zone=addr:10m;
server {
listen 80;
server_name localhost;
location /brand/test{
limit_conn addr 2;
proxy_pass http://192.168.211.1:18081;
}
}
}
我们可以自己设定一个服务,该服务需要运行的时长为1s,然后nginx 反向代理到这个服务中:
@GetMapping("/test")
public Result testConne(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Result(true,StatusCode.OK,"ok");
}
表示:
limit_conn_zone $binary_remote_addr zone=addr:10m; 表示限制根据用户的IP地址来显示,设置存储地址为的内存大小10M
limit_conn addr 2; 表示 同一个地址只允许连接2次。
测试:
此时开3个线程,测试的时候会发生异常,开2个就不会有异常
5、数据同步的问题
如下图所示,虽然高并发的问题是得到了相对应的解决,但是当我们的管理员去更改数据的时候,我们怎么才能让redis知道我们已经改动了数据呢?所以这里就设计一个方法来处理我们的数据同步问题。
我们可以利用Mysql的主从,使用一个客户端,让主Mysql以为他是一个从Mysql,使用这个客户端去读取主Mysql的biglog,也就是二进制log,让这个客户端去监听主mysql,然后我们再利用java微服务去监听那些被修改的数据,要是监听到数据被更改,那么就进行数据同步的处理。
主要的结构流程如下图所示:
思路:
首先canal有两个角色:
- canal-server 服务端 伪装他自己是一个malserter的slave
- canal-client 客户端 用来监听canal-server客户端(java的客户端 处理数据以及业务逻辑)
canal搭建完之后的数据同步设计思路:
- 有个数据库,是一个master(需要进行配置)
- canal-server 是slave(伪装的)
- canal-client 进行监听canal-server
- 一旦数据库master发生数据的更新,canal-server就获取到数据,canal-client监听到数据变化,在客户端中代码实现,统一获取到数据进行同步。
搭建canal并实现监听数据的变化的步骤:
- mysql 需要开启binlog(master角色)
- mysql 创建一个账号 用于slave专门使用,授予权限slave的权限,进行远程连接
- 通过docker 安装canal-server
- 配置canal-server(配置,连接到的master的ip端口,以及自身的账号和密码以及要监听的数据库 和表有哪些)
- 搭建canal-client(java微服务:监听canal-server 获取被修改的数据,然后做业务处理:同步到redis中)
使用的是alibaba开发的项目:
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
项目地址:https://github.com/alibaba/canal
5.1、配置master Mysql(开启binlog模式)
(1) 连接到mysql中,并修改/etc/mysql/mysql.conf.d/mysqld.cnf 需要开启主 从模式,开启binlog模式。
执行如下命令,编辑mysql配置文件
命令行如下:
docker exec -it mysql /bin/bash
cd /etc/mysql/mysql.conf.d
vi mysqld.cnf
修改mysqld.cnf配置文件,添加如下配置:
上图配置如下:
log-bin=/var/lib/mysql/mysql-bin
server-id=12345
(2) 创建账号 用于测试使用,
使用root账号创建用户并授予权限
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
(3)重启mysql容器
docker restart mysql
5.2、canal的安装
下载镜像:
docker pull docker.io/canal/canal-server
容器安装
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
进入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步数据的数据库连接配置。
执行代码如下:
docker exec -it canal /bin/bash
cd canal-server/conf/
vi canal.properties
cd example/
vi instance.properties
修改canal.properties的id,不能和mysql的server-id重复,如下图:
修改instance.properties,配置数据库连接地址:
这里的canal.instance.filter.regex
有多种配置,如下:
可以参考地址如下:
https://github.com/alibaba/canal/wiki/AdminGuide
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
配置完成后,设置开机启动,并记得重启canal。
docker update --restart=always canal
docker restart canal
5.3、canal微服务的搭建
由于官方没有提供springboot的依赖,我们就需要自定义起步依赖了。
也就是:https://github.com/chenqian56131/spring-boot-starter-canal
它主要提供了SpringBoot环境下canal
的支持,我们需要先安装该工程,在starter-canal
目录下执行mvn install
,如下图:
之后添加我们的启动依赖:
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
在启动类中启动canal注解:
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableEurekaClient
@EnableCanalClient //在启动类中启动canal注解
@EnableFeignClients(basePackages = "com.yxinmiracle.content.feign")
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class,args);
}
}
编写配置文件:
server:
port: 18083
spring:
application:
name: canal
redis:
host: 192.168.211.132
port: 6379
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:7001/eureka
instance:
prefer-ip-address: true
feign:
hystrix:
enabled: true
ribbon:
eager-load:
enabled: true
ReadTimeout: 100000
#canal配置
canal:
client:
instances:
example:
host: 192.168.211.132
port: 11111
业务代码:
@CanalEventListener
public class MyEventListener {
@InsertListenPoint
public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
//do something...
}
@UpdateListenPoint
public void onEvent1(CanalEntry.RowData rowData) {
//do something...
}
@DeleteListenPoint
public void onEvent3(CanalEntry.EventType eventType) {
//do something...
}
@ListenPoint(destination = "example", schema = "canal-test", table = {"t_user", "test_table"}, eventType = CanalEntry.EventType.UPDATE)
public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
//do something...
}
}
测试代码(获取更新之前的数据,以及更新之后的数据)
@UpdateListenPoint
public void onEvent1(CanalEntry.RowData rowData) {
//do something...
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
System.out.println(column.getName()+" : "+column.getValue());
}
System.out.println("===================更新之后==================");
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
System.out.println(column.getName()+" : "+column.getValue());
}
}
其中需要注意的是:这个canal微服务启动可能会报错,原因是他有引用数据库的依赖,但是并没有连接数据库,所以我们需要进行排除,也就是在启动类上加上注解:
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
5.4、逻辑分析
- 当table被人写了之后
- canal-server端监听到了数据变化
- canal-client监听数据,并获得被修改的那个行记录中的id,执行一个sql语句获取那一行的数据
- 通过feign调用content微服务获取到最新的数据
- 先根据id查看redis中有无这个数据的值,如果有,那么就将那个条数据覆盖。如果没有,就直接添加
5.5、代码编写
这里我们使用ListenPoint的方法
代码:
/**
* destination 是linux中的目录 也是目的地址
* schema 数据库的库名
* table 要监听的表名
*
* @param eventType
* @param rowData
*/
@ListenPoint(destination = "example",
schema = "database",
table = {"tb_content",
"tb_content_category"},
eventType = {CanalEntry.EventType.UPDATE,
CanalEntry.EventType.DELETE,
CanalEntry.EventType.INSERT}
)
public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
// 判断
String categoryId = getColumnValue(eventType, rowData);
// 调用feign 获取更新后的数据
Result<List<Content>> result = contentFeign.findByCategory(Long.valueOf(categoryId));
// 得到广告数据
List<Content> data = result.getData();
// 存入redis中,进行数据覆盖
stringRedisTemplate.boundValueOps("content_"+categoryId).set(JSON.toJSONString(data));
}
/**
* 获取categoryId
* @param eventType
* @param rowData
* @return
*/
private String getColumnValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
// 1. 判断如果是insert 和 update 那么就获取after的数据
String categoryId = "";
if(eventType== CanalEntry.EventType.DELETE){
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
if (column.getName().equals("category_id")) {
categoryId = column.getValue();
break;
}
}
}else { // 如果是删除,那么就获取删除之前
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
if (column.getName().equals("category_id")) {
categoryId = column.getValue();
break;
}
}
}
return categoryId;
}
服务逻辑代码:
@GetMapping("/list/category/{id}")
public Result<List<Content>> findByCategory(@PathVariable(name = "id" ) Long id){
Content content = new Content();
content.setCategoryId(id);
List<Content> contentList = contentService.select(content);
return new Result<>(true, StatusCode.OK,"获取列表成功",contentList);
}
测试: