classBitfinexListener(valvertx:Vertx):WebSocket.Listener{
overridefunonOpen(webSocket:WebSocket?){
super.onOpen(webSocket)
LOGGER.info("websocketopened")
this.vertx.periodicStream(60000).toObservable()
.subscribe{i->
valpingTxt=JsonObject().put("event","ping")
.put("cid",Random(2020).nextInt())
.encode()
webSocket?.sendText(pingTxt,true)?
.thenRun{->LOGGER.info("sentping{}",pingTxt)}
}
}
varparts:MutableList<CharSequence?>=
MutableList(0){index:Int->""}
varaccumulatedMessage:CompletableFuture<*>=
CompletableFuture<Any>()
overridefunonText(webSocket:WebSocket,
message:CharSequence?,
last:Boolean):CompletionStage<*>?{
parts.add(message)
webSocket.request(1)
if(last){
valcompleteMessage=parts.joinToString(separator="")
{charSequence->charSequence?:""}
parts.clear()
accumulatedMessage.complete(null)
valcf:CompletionStage<*>=accumulatedMessage
accumulatedMessage=CompletableFuture<Any>()
onMessage(completeMessage)
returncf
}
returnaccumulatedMessage
}
funonMessage(message:String){
valbitfinexMessage=Json.decodeValue(message)
//...seerepoatgithubforfullcode
}
}
classBitfinexConnection:AbstractVerticle(){
varwebSocket:WebSocket?=null
overridefunstart(){
LOGGER.info("deployingBitfinexConnection")
valsubs=vertx.sharedData()
.getLocalMap<Int,String>("bitfinex.subscriptions")
vallistener=BitfinexListener(this.vertx,subs)
valclient=HttpClient.newHttpClient()
valuri=URI.create("wss://api-pub.bitfinex.com/ws/2")
this.webSocket=client.newWebSocketBuilder()
.buildAsync(uri),listener).join()
vertx.eventBus()
.consumer<JsonObject(BITFINEX_EB_ADDRESS)
.handler{jsonMsg->
if(webSocket==null||webSocket?.isOutputClosed()!!){
jsonMsg.reply(JsonObject()
.put("message","websocketclosed")
.put("statusCode",503))
return@handler
}
valbitfinexMessage=jsonMsg.body().encode()
this.webSocket?.sendText(bitfinexMessage,true)?
.thenRun{LOGGER.debug("delivered{}",bitfinexMessage)}
}
}
}
funmain(){
valvertx=Vertx.vertx()
valsymbol="tBTCUSD"
vertx.rxDeployVerticle(BitfinexConnection::class.java.name)
.subscribe(
{id->
LOGGER.info("deployedbitfinexconnection{}",id)
valaddress="ticker." symbol
vertx.eventBus()
.consumer<JsonArray>(address)
.handler{jsonMsg->
LOGGER.info("received{}{}",address,
jsonMsg.body().encodePrettily())
}
valsubscribeMessage=JsonObject()
.bfxSubscribeTickerMessage(symbol)
vertx.eventBus()
.send(BITFINEX_EB_ADDRESS,subscribeMessage)
},
{t:Throwable?->
LOGGER.error("deploymentfailed",t)}
)
}