区块链研究实验室|使用Java 11 WebSocket API的Websocket客户端

区块链研究实验室 2020-09-23 08:00
对于一个小型加密项目,我想使用Bitfinex WebSocket API实时获取市场数据。
从Java SE 11开始,JDK包含一个客户端WebSocket API。Javadoc包含一些代码示例,但是您不能立即使用这些示例。在网上搜索“ java websocket client”将主要显示有关旧JSR 352 websocket的示例和指南。设置一切以完成工作并不像预期的那样简单,因此我编写了这个小教程。
只要适合,我都会在项目中使用vert.x,并且也有一个websocket客户端API。但是可悲的是vert.x websocket客户端有一些缺点,它不适用于重定向☹尽管我们不使用vert.x websocket客户端,但我们将vert.x用作小型应用程序的基础。
够多了,我们开始编码。要构建并连接到Websocket服务器,构建器需要一个侦听器,该侦听器将侦听传入的数据包:

  classBitfinexListener(valvertx:Vertx):WebSocket.Listener{
overridefunonOpen(webSocket:WebSocket?){
super.onOpen(webSocket)
LOGGER.info("websocketopened")
this.vertx.periodicStream(60000).toObservable()
.subscribe{i->
valpingTxt=JsonObject().put("event","ping")
.put("cid",Random(2020).nextInt())
.encode()
webSocket?.sendText(pingTxt,true)?
.thenRun{->LOGGER.info("sentping{}",pingTxt)}
}
}
varparts:MutableList<CharSequence?>=
MutableList(0){index:Int->""}
varaccumulatedMessage:CompletableFuture<*>=
CompletableFuture<Any>()
overridefunonText(webSocket:WebSocket,
message:CharSequence?,
last:Boolean):CompletionStage<*>?{
parts.add(message)
webSocket.request(1)
if(last){
valcompleteMessage=parts.joinToString(separator="")
{charSequence->charSequence?:""}
parts.clear()
accumulatedMessage.complete(null)
valcf:CompletionStage<*>=accumulatedMessage
accumulatedMessage=CompletableFuture<Any>()
onMessage(completeMessage)
returncf
}
returnaccumulatedMessage
}
funonMessage(message:String){
valbitfinexMessage=Json.decodeValue(message)
//...seerepoatgithubforfullcode
}
}


我们重写onOpen方法以建立对bitfinex的定期ping。重要的一件事是,当您覆盖onOpen时,必须调用super.onOpen,否则客户端不会向服务器发送任何数据。
我花了很多时间才发现这个错误。由于侦听器是一个接口,所以我不习惯于调用接口的超级方法。但在这种情况下这很重要。java8中引入的接口中默认方法的概念对我来说还没有成为第二天性。
onText消息将收集所有传输的文本数据,直到文本完成为止(通常是一次调用inText的情况)。文本完成后,将通过onMessage方法中的vert.x事件总线发送文本。
现在,我们可以使用此侦听器设置一个表示与bitfinex的连接的顶点:

  classBitfinexConnection:AbstractVerticle(){
varwebSocket:WebSocket?=null
overridefunstart(){
LOGGER.info("deployingBitfinexConnection")
valsubs=vertx.sharedData()
.getLocalMap<Int,String>("bitfinex.subscriptions")
vallistener=BitfinexListener(this.vertx,subs)
valclient=HttpClient.newHttpClient()
valuri=URI.create("wss://api-pub.bitfinex.com/ws/2")
this.webSocket=client.newWebSocketBuilder()
.buildAsync(uri),listener).join()
vertx.eventBus()
.consumer<JsonObject(BITFINEX_EB_ADDRESS)
.handler{jsonMsg->
if(webSocket==null||webSocket?.isOutputClosed()!!){
jsonMsg.reply(JsonObject()
.put("message","websocketclosed")
.put("statusCode",503))
return@handler
}
valbitfinexMessage=jsonMsg.body().encode()
this.webSocket?.sendText(bitfinexMessage,true)?
.thenRun{LOGGER.debug("delivered{}",bitfinexMessage)}
}
}
}


该类非常简单明了,在verticle的start方法中,将初始化并启动与公共bitfinex api的websocket连接。应通过vert.x事件总线将消息直接发送到websocket,因此我们在地址BITFINEX_EB_ADDRESS上启动使用者。
在最后一步,我们将所有内容放在一起,并尝试订阅tBTCUSD代码:

  funmain(){
valvertx=Vertx.vertx()
valsymbol="tBTCUSD"
vertx.rxDeployVerticle(BitfinexConnection::class.java.name)
.subscribe(
{id->
LOGGER.info("deployedbitfinexconnection{}",id)
valaddress="ticker." symbol
vertx.eventBus()
.consumer<JsonArray>(address)
.handler{jsonMsg->
LOGGER.info("received{}{}",address,
jsonMsg.body().encodePrettily())
}
valsubscribeMessage=JsonObject()
.bfxSubscribeTickerMessage(symbol)
vertx.eventBus()
.send(BITFINEX_EB_ADDRESS,subscribeMessage)
},
{t:Throwable?->
LOGGER.error("deploymentfailed",t)}
)
}


main方法启动Vertx并部署bitfinex Websocket Verticle。部署完成后,我们将带有订阅有效负载的消息发送到websocket verticle,并在特殊的eventbus地址上启动使用者。
您可以在github上找到所有代码,对其进行克隆,以确保已安装Java 11或更高版本,并使用以下命令运行它:
./gradlewrun
我希望本教程将为您快速启动项目中的websocket客户端代码。

上一篇:返回列表

下一篇:通俗易懂的ETH(以太坊)详细介绍

相关阅读:

Copyright © 2013 比特巴手机版
币圈人都爱上的网站,新闻行情教程人物测评资讯大全