原创

webSocket多线程推送出错


错误信息

webSocket多线程推送出错,报错信息为The remote endpoint was in state [TEXT_PARTIAL_WRITING]

错误原因

两个方法有可能同时执行,当方法1或者方法2遍历到某一个session并且调用sendMessage发送消息的时候,另外一个方法也正好也在使用相同的session发送另外一个消息(同一个session消息发送冲突了,也就是说同一个时刻,多个线程向一个socket写数据冲突了),就会报上面的异常。

解决方案

采用的是将session存入2个数组,数组A放置空闲session,数组B存放忙碌中的session。何为忙碌?当前session进行send发送信息还没结束即为忙碌。

在发送信息时判断当前session是忙碌状态还是空闲状态:

如为空闲,直接执行发送并且把当前session在空闲(A)中移除,并把它放入忙碌(B)的数组中,当执行完成,将空闲的session从忙碌(B)中移除,并且存入空闲(A)数组中。

如为忙碌,则跳过发送,延迟递归调用本身发送方法,延迟后执行再作判断是当前session是否忙碌,递归直到空闲再发送这则数据

代码实现

声明静态变量集合用于存放空闲以及忙碌的session

 // 记录空闲Session集合
    private static CopyOnWriteArraySet<Session> idle = new CopyOnWriteArraySet<Session>();
    // 记录正在使用中Session集合,value为Future,表示使用情况
    private static ConcurrentHashMap<Session, Future<Void>> busy = new ConcurrentHashMap<Session, Future<Void>>();

发送数据

// 新增Session
    public static void open(Session session) {
        idle.add(session);
    }

    // 关闭Session
    public static void close(Session session) {
        idle.remove(session);
        busy.remove(session);
    }

    // 使用session发送消息
    public static void send(Session session, SocketMessage message, Integer timeout) throws InterruptedException {
        if (timeout < 0) { // timeout后放弃本次发送
            return;
        }
        if (idle.remove(session)) { // 判断session是否空闲,抢占式
           busy.put(session, session.getAsyncRemote().sendText(JSON.toJSONString(message)));
        } else {
            // 若session当前不在idle集合,则去busy集合中查看session上次是否已经发送完毕,即采用惰性判断
            synchronized (busy) {
                if (busy.containsKey(session) && busy.get(session).isDone()) {
                    busy.remove(session);
                    idle.add(session);
                }
            }
            // 重试
            Thread.sleep(100);
            send(session, message, timeout - 100);
        }
    }

添加session->客户端上线时操作

@OnOpen
    public void onOpen(Session session,@PathParam("param")String socketUserId) {
         System.out.println("有新的客户端连接了: "+session.getId());
        open(session);
    }

移除session->客户端掉线时操作

@OnClose
    public void onClose(Session session) {
        System.out.println("用户" + session.getId() + "离线");
        close(session);
    }

使用发送消息

try {
	send(session, "大家好,我是零三,很高兴认识大家,我的网址是web03.cn", 3000);
} catch (InterruptedException e) {
	e.printStackTrace();
}

总结

此方法可以很有效的解决低并发问题,这只是一种解决方案,当然可能有更优的解决方法,交给多线程的同步机制去执行,因为我个人用多线程无法实现,所以使用当前下策

多线程
socket
  • 作者:零三(联系作者)
  • 最后更新时间:2020-05-28 21:24
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 转载声明:来源地址 https://web03.cn