关于长连接的一些介绍
长连接的应用场景非常的广泛,比如监控系统,IM系统,即时报价系统,推送服务等等。像这些场景都是比较注重实时性,如果每次发送数据都要进行一次DNS解析,建立连接的过程肯定是极其影响体验。
长连接的维护必然需要一套机制来控制。比如 HTTP/1.0 通过在 header 头中添加 Connection:Keep-Alive参数,如果当前请求需要保活则添加该参数作为标识,否则服务端就不会保持该连接的状态,发送完数据之后就关闭连接。HTTP/1.1以后 Keep-Alive 是默认打开的。
Netty 是 基于 TCP 协议开发的,在四层协议 TCP 协议的实现中也提供了 keepalive 报文用来探测对端是否可用。TCP 层将在定时时间到后发送相应的 KeepAlive 探针以确定连接可用性
Netty状态回调代码
class ClientHandler : ChannelInboundHandlerAdapter() {
/** 客户端请求的心跳命令 */
private val heartBeat =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("heartbeat", CharsetUtil.UTF_8))
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
//接收到消息
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
//添加心跳
if (evt is IdleStateEvent) {
if (IdleState.WRITER_IDLE?.equals(evt.state())) {
ctx.writeAndFlush(heartBeat.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
}
super.userEventTriggered(ctx, evt)
}
override fun channelActive(ctx: ChannelHandlerContext) {
//连接成功回调
ctx.fireChannelActive()
}
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
//中途断开回调
}
override fun exceptionCaught(
ctx: ChannelHandlerContext,
cause: Throwable
) {
//连接抛出异常回调
cause.printStackTrace()
ctx.close()
}
override fun handlerRemoved(ctx: ChannelHandlerContext) {
super.handlerRemoved(ctx)
//与服务断开连接时的回调
NettyObserverManager.instance.notifyObserver(ctx)//观察者发送断开通知 代码在下面
}
}
添加回调的管理类
class SimpleChatClientInitializer :
ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
//心跳每次像服务端发送的频率
pipeline.addLast(IdleStateHandler(4, 4, 4, TimeUnit.SECONDS))
pipeline.addLast("decoder", StringDecoder())
pipeline.addLast("encoder", StringEncoder())
//添加自定义的监听回调对象
pipeline.addLast("handler", ClientHandler())
}
}
使用服务开启长连接
class NotifyService : Service(), ChannelFutureListener, NettyObserverListener {
private var channel: Channel? = null
private var host = "xxx.xxx.xxx"//与后台统一的IP地址
private val port = 0//与后台统一的端口号
private var nio: NioEventLoopGroup? = null
private val clientInitializer = SimpleChatClientInitializer()
private val grayServiceId = 1001
//当前服务对象
private val mBinder = ClientBinder()
override fun onBind(intent: Intent): IBinder {
return mBinder
}
override fun onCreate() {
super.onCreate()
//添加观察者
NettyObserverManager.instance.add(this)
init()
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
//设置前台服务提高优先级
if (Build.VERSION.SDK_INT < Build.VERSION_CODES.O) {
//Android4.3 - Android8.0,隐藏Notification上的图标
val innerIntent = Intent(this, GrayInnerService::class.java)
startService(innerIntent)
startForeground(grayServiceId, Notification())
} else {
//Android8.0以上app启动后通知栏会出现一条"正在运行"的通知
val channel = NotificationChannel(
"com.guanwei.pddemo", "notify",
NotificationManager.IMPORTANCE_HIGH
)
val manager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
manager.createNotificationChannel(channel)
val notification = Notification.Builder(
applicationContext,
"com.guanwei.pddemo"
).build()
startForeground(grayServiceId, notification)
}
return START_STICKY
}
/**
* 服务在后台灰色保活
*/
inner class GrayInnerService : Service() {
override fun onBind(intent: Intent?): IBinder? {
return null
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
startForeground(grayServiceId, Notification())
stopForeground(true)
stopSelf()
return super.onStartCommand(intent, flags, startId)
}
}
private fun init() {
Thread {
kotlin.run {
if (nio == null) {
nio = NioEventLoopGroup()
}
//开始连接
doConnect()
}
}.start()
}
private fun doConnect() {
try {
channel = Bootstrap().run {
//连接建立
group(nio)
channel(NioSocketChannel::class.java)
handler(clientInitializer)
connect(
host,
port
).sync().channel()
}
val map = HashMap<String, Long>()
map["id"] = SPUtils.getInstance().getLong("userId")
val json = JSONObject(map as Map<*, *>).toString()
sendMessage(json)
} catch (e: Exception) {
e.stackTrace
}
}
fun sendMessage(msg: String) {
//向服务端发送消息
channel!!.writeAndFlush(msg).addListener(this)
}
/**
* 连接失败回调
*/
override fun operationComplete(future: ChannelFuture) {
if (!future.isSuccess) {
//连接失败重新连接
//每三秒进行一次重连接
future.channel().eventLoop().schedule({
kotlin.run {
doConnect()
}
}, 3, TimeUnit.SECONDS)
} else {
LogUtils.a("连接成功")
}
}
override fun nettyObserverUpData(v: ChannelHandlerContext) {
//每三秒进行一次重连接
v.channel().eventLoop().schedule({
kotlin.run {
doConnect()
}
}, 3, TimeUnit.SECONDS)
}
override fun onDestroy() {
super.onDestroy()
//移除观察者
NettyObserverManager.instance.remove(this)
}
/**
* 与activity进行交互
*/
inner class ClientBinder : Binder() {
public fun getService(): NotifyService {
return this@NotifyService
}
public fun sendMsg(msg: String) {
sendMessage(msg)
}
}
}
封装观察者进行状态监听与通知
/**
* netty状态观察者接口
*/
interface NettyObserverListener {
/**
* 刷新操作
*/
fun nettyObserverUpData(v: ChannelHandlerContext)
}
/**
* netty观察者操作接口
*/
interface NettySubjectListener {
/**
* 添加
*/
fun add(nettyObserverListener: NettyObserverListener)
/**
* 通知内容
*/
fun notifyObserver(t: ChannelHandlerContext)
/**
* 删除
*/
fun remove(nettyObserverListener: NettyObserverListener)
}
class NettyObserverManager : NettySubjectListener {
/**
* 数据集合
*/
private val list = ArrayList<NettyObserverListener>()
/**
* 添加
*/
override fun add(nettyObserverListener: NettyObserverListener) {
list.add(nettyObserverListener)
}
/**
* 更新
*/
override fun notifyObserver(t: ChannelHandlerContext) {
//数据更新
for (ol in list) {
ol.nettyObserverUpData(t)
}
}
/**
* 移除
*/
override fun remove(nettyObserverListener: NettyObserverListener) {
list.remove(nettyObserverListener)
}
companion object {
val instance: NettyObserverManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
NettyObserverManager()
}
}
}
在Activity中开启、绑定服务
服务开启与绑定所需要的参数如何获取就不多讲了,不会的建议回去重新学一下子服务
//先开启服务
startService(iIntent)
//绑定服务
bindService(
iIntent,
myConnection,
Context.BIND_AUTO_CREATE
)
override fun onDestroy() {
super.onDestroy()
//服务解绑
unbindService(myConnection)
//关闭服务
stopService(iIntent)
}