优秀是一种习惯!!!

websocket使用问题汇总

2023-11-29

websocket使用问题

一、websocket推送功能中只使用session.isOpen()来判断连接是否建立成功可以推送是否可行?

WebSocket 推送功能中使用 session.isOpen() 判断连接是否建立成功是不够可靠的。虽然在大多数情况下,isOpen() 方法可以用来检查 WebSocket 连接是否打开,但它并不能保证连接的稳定性和可靠性。

在 WebSocket 建立连接期间,服务器和客户端需要完成一些握手过程,验证双方的身份,协调双方的协议选项等操作。只有在握手成功之后,才能确定 WebSocket 连接已经建立成功。因此,在发送消息前应该确保连接已经完全建立。

如果仅使用 session.isOpen() 来判断连接是否建立成功,可能会忽略掉一些异常情况,例如:

  • 客户端在握手期间关闭了连接。
  • 服务器在握手期间出现异常,导致连接未正常建立。
  • 网络故障或其他原因导致 WebSocket 连接断开。

为了确保 WebSocket 推送的可靠性,我们应该尽可能多地掌握连接状态的信息,并及时处理异常情况。可以考虑使用心跳机制、定时重连、异常监控等方式来提高连接的可靠性。同时,在推送之前,需要确保连接已经建立成功,并且数据发送成功后,还需要根据具体业务场景来确认接收方是否已经成功接收数据。

以下是一个使用心跳机制和定时重连的 WebSocket 推送示例代码:

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Timer;
import java.util.TimerTask;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

@ClientEndpoint
public class WebsocketClient {
  private Session session;
  private String serverUri;

  public WebsocketClient(String uri) {
      this.serverUri = uri;
  }

  @OnOpen
  public void onOpen(Session session) {
      System.out.println("WebSocket 已连接");
      this.session = session;
      startHeartbeat(); // 开始心跳
  }

  @OnMessage
  public void onMessage(String message) {
      System.out.println("收到消息:" + message);
  }

  @OnError
  public void onError(Throwable t) {
      System.err.println("WebSocket 连接出错:" + t.getMessage());
      stopHeartbeat(); // 停止心跳
  }

  @OnClose
  public void onClose(CloseReason reason) {
      System.out.println("WebSocket 已关闭:" + reason.getReasonPhrase());
      stopHeartbeat(); // 停止心跳
      reconnect(); // 重连
  }

  private Timer heartbeatTimer;
  private long HEARTBEAT_INTERVAL = 5000; // 心跳间隔,单位毫秒

  // 发送心跳消息
  private void sendHeartbeat() {
      try {
          session.getBasicRemote().sendText("heartbeat");
      } catch (Exception e) {
          System.err.println("发送心跳消息失败:" + e.getMessage());
      }
  }

  // 开始心跳
  private void startHeartbeat() {
      heartbeatTimer = new Timer();
      TimerTask task = new TimerTask() {
          @Override
          public void run() {
              if (!session.isOpen()) {
                  stopHeartbeat();
                  return;
              }
              sendHeartbeat();
          }
      };
      heartbeatTimer.schedule(task, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
  }

  // 停止心跳
  private void stopHeartbeat() {
      if (heartbeatTimer != null) {
          heartbeatTimer.cancel();
          heartbeatTimer = null;
      }
  }

  // 重连
  private void reconnect() {
      stopHeartbeat();
      while (true) {
          try {
              WebSocketContainer container = ContainerProvider.getWebSocketContainer();
              container.connectToServer(this, new URI(serverUri));
              System.out.println("WebSocket 重连成功");
              break;
          } catch (Exception e) {
              System.err.println("WebSocket 连接失败:" + e.getMessage());
              try {
                  Thread.sleep(5000); // 重连间隔,单位毫秒
              } catch (InterruptedException ie) {
                  System.err.println("休眠被中断:" + ie.getMessage());
              }
          }
      }
  }

  public static void main(String[] args) throws URISyntaxException, InterruptedException {
      String uri = "ws://localhost:8080/websocket";
      WebsocketClient client = new WebsocketClient(uri);
      client.reconnect(); // 首次连接
      Thread.sleep(Long.MAX_VALUE); // 阻塞主线程
  }
}

在上述代码中,我们使用了 javax.websocket 包提供的 WebSocket API 来进行 WebSocket 连接。在 onOpen 方法中启动了一个定时器,用于每隔一段时间发送心跳消息;在 onClose 方法中重新连接 WebSocket 服务器。如果 WebSocket 连接出错,则停止心跳并尝试重新连接。需要注意的是,心跳间隔和重连间隔可以根据具体业务场景来设置。

main 方法中,我们首先定义了 WebSocket 服务器的 URI,然后创建了一个 WebsocketClient 对象,并调用其 reconnect 方法来进行首次连接。由于 WebSocket 连接是异步的,因此我们使用 Thread.sleep(Long.MAX_VALUE) 来阻塞主线程,避免程序退出。

采用异常监控方式来提高连接的可靠性示例代码:

import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.websocket.*;

@ClientEndpoint
public class WebsocketClient {
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
private Session session = null;
private String serverUri = null;

public WebsocketClient(String serverUri) {
  this.serverUri = serverUri;
}

@OnOpen
public void onOpen(Session session) {
  this.session = session;
  sendHeartbeatMessage();
}

@OnMessage
public void onMessage(String message) {
  System.out.println("Received Message: " + message);
}

@OnError
public void onError(Throwable t) {
  System.out.println("WebSocket Error: " + t.getMessage());
  t.printStackTrace();
  reconnect();
}

@OnClose
public void onClose() {
  System.out.println("WebSocket closed");
  reconnect();
}

/**
   * 启动定时器,定时发送心跳消息
   */
  private void sendHeartbeatMessage() {
      Runnable task = () -> {
          try {
              if (session.isOpen()) {
                  session.getBasicRemote().sendText("heartbeat");
              }
          } catch (Exception e) {
              e.printStackTrace();
          }
      };
      executorService.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS);
  }

  /**
   * 重新连接 WebSocket 服务器
   */
  private void reconnect() {
      executorService.schedule(() -> {
          try {
              container.connectToServer(this, new URI(serverUri));
          } catch (Exception e) {
              e.printStackTrace();
              reconnect();
          }
      }, 5, TimeUnit.SECONDS);
  }

  public static void main(String[] args) throws Exception {
      String uri = "ws://localhost:8080/websocket";
      WebsocketClient client = new WebsocketClient(uri);
      client.container.connectToServer(client, new URI(client.serverUri));

      // 阻塞主线程
      while (true) {
          if (!client.session.isOpen()) {
              throw new Exception("WebSocket disconnected");
          }
      }
  }
}

在上述代码中,我们使用 ScheduledExecutorService 来实现定时任务,定时发送心跳消息。当发生异常时,记录日志并尝试重新连接 WebSocket 服务器。reconnect 方法使用了延迟执行的方式来防止频繁重连。最后,在 main 方法中阻塞主线程,直到 WebSocket 连接断开。

需要注意的是,为了避免出现长时间阻塞的情况,我们需要设置超时时间或者使用定时器等方式定时检查 WebSocket 连接是否正常。另外,心跳间隔和重连间隔可以根据具体业务场景来设置。


标题:websocket使用问题汇总
作者:amethystfob
地址:https://newmoon.top/articles/2023/11/28/1701163165795.html

欢迎各路大侠指点留痕: