In this blog, we discuss using Spark Structured streaming via Datastax Enterprise version 6.8.15 to process crypto trade information made available on a Kafka topic. Data sent to the Kafka topic is generated from CryptoCompare API and streamed through Websockets and consumed using Akka.
The project can be run in 2 modes: in-memory mode and Kafka mode. In in-memory mode, the data streamed from the crypto compares WebSocket and is consumed; the aggregations are displayed on the console. In the Kafka mode, data streamed into the WebSockets is consumed using Akka and then sent to a Kafka topic. A Spark job written in Scala listens on that Kafka topic and performs aggregations on the data consumed before the results are finally sent to a Cassandra database.
Tech Stacks used are as follows:
WebSocket is a protocol that provides a bi-directional channel between the browser and web server usually run over an upgraded HTTP(S) connection. Data is exchanged in messages in the form of either binary data or Unicode text.
If running this project on Docker, do take care to make sure necessary ports are exposed in the relevant docker-compose.yml file for the Kafka container and the docker-compose.yml DSE container both available in Github. The referenced port include
The GitHub Repo for this project can be found here. You can follow along below, or use the link to follow along on the GitHub repo.
If you are not using the docker-compose.yml file provided in this demo to create the necessary containers, please ensure you have the correct DSE version 6.8.15 running and also installed Kafka broker 2.2.1. Also, your DSE installation must have analytics enabled to allow for use of the Apache Spark bundled with it.
sbt clean package
docker exec -it dse-server /bin/bashmkdir -p /opt/cryptocompareexitdocker cp akka-websockets-spark-cassandra_2.11-0.1.jar dse-server:/opt/cryptocompare/
dse spark-submit --packages "com.typesafe.akka:akka-stream_2.11:2.5.32,com.typesafe.akka:akka-actor_2.11:2.5.32,\
com.typesafe.akka:akka-http_2.11:10.1.15,com.typesafe.akka:akka-http-jackson_2.11:10.1.15,com.typesafe.akka:akka-http-spray-json_2.11:10.1.15"\
--master 'local[*]' --conf spark.sql.shuffle.partitions=8\
akka-websockets-spark-cassandra_2.11-0.1.jar --mode memory --timeout 150 <API_KEY>
This should start your streaming application and you should see results on your console.
To run in Kafka mode, you will need to create the needed topic as an additional step.
cd <<path/to/docker-compose.yml>>
docker-compose up
docker exec -it kafka /bin/bash
cd /opt/bitnami/kafka/bin/
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 --topic cryptocompare
cd <<path/to/kafka_home/home>>
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 --topic cryptocompare
docker exec -it dse-server /bin/bash
To create keyspace and tables, run the commands below
cqlshcreate keyspace if not exists cryptocompare with replication = {'class': 'SimpleStrategy', 'replication_factor': 1};CREATE TABLE cryptocompare.trademsgs1minutewindow ( date timestamp, window_start timestamp, window_end timestamp, market text, direction text, fromcoin text, tocurrency text, avgprice double, totalquantity double, totalvol double, counttxns bigint, uuid timeuuid, PRIMARY KEY ((date), window_end, window_start, direction, market)) WITH CLUSTERING ORDER BY (window_end DESC, window_start DESC, direction ASC, market ASC);
dse spark-submit --packages "com.typesafe.akka:akka-stream_2.11:2.5.32,com.typesafe.akka:akka-actor_2.11:2.5.32,\
com.typesafe.akka:akka-http_2.11:10.1.15,com.typesafe.akka:akka-http-jackson_2.11:10.1.15,com.typesafe.akka:akka-http-spray-json_2.11:10.1.15,\
org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,\
org.apache.kafka:kafka_2.11:2.2.1,org.apache.kafka:kafka-streams:2.2.1"\
--master 'local[*]' --conf spark.sql.shuffle.partitions=8\
akka-websockets-spark-cassandra_2.11-0.1.jar --mode kafka --timeout 300\
--kafkabroker <KAFKA_BROKER>:9092 --kafkatopic <KAFKA_TOPIC> --cassandraurl <Cassandra_Host>\
<API_KEY>
cqlsh
select * from cryptocompare.trademsgs1minutewindow where date = '2021-01-08 23:00:00+0000';
As mentioned earlier, the Akka framework is used to consume the messages being streamed from the cryptocompare API. The WebSocket API defines key components that represent a WebSocket stream: the Source, Sink, and the Flow. The connection is started by the code as shown below.
val uri = Uri(s"wss://streamer.cryptocompare.com/v2?API_key=$APIkey")
val (upgradeResponse, notused) = Http().singleWebSocketRequest(WebSocketRequest(uri), websocketFlow(streamerDFActor, timeout))
Cryptocompare API requires that after the web socket connection is established, messages must be sent along the channel indicating what type of messages the subscriber is interested in and to be streamed to it. Without this quote, trade messages won’t be streamed to a subscriber.
To facilitate this bi-directional transfer, we make use of an ActorRef wrapped in a Source.fromPublisher API call as shown below.
def websocketFlow(streamerDFActor: ActorRef, timeout: Int) = {
import akka.stream.scaladsl._
val (actorRef: ActorRef, publisher: Publisher[TextMessage.Strict]) =
Source.actorRef[String](bufferSize = 16, overflowStrategy = OverflowStrategy.dropNew)
.map(msg => TextMessage.Strict(msg))
.toMat(Sink.asPublisher(false))(Keep.both)
.run()
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict => processWSJsonMsg(message.text, actorRef, streamerDFActor, timeout)
case _ => // ignore other message types
}
Flow.fromSinkAndSource(printSink, Source.fromPublisher(publisher))
}
With this, message streaming is started and flows into our Sink where it is processed.
The source.fromPublisher API call starts a Source and allows it to be open and not closed immediately until when the caller chooses to close the stream. This is very important as our WebSocket connection requires that it remains open indefinitely for the Trade and Stock messages to keep flowing in.
The Sink in our case uses the spray-JSON library to attempt to parse the received message into known or expected objects. If an incoming message is successfully parsed as a type WsMsgTrade, then the message is sent to a Kafka topic.
def processWSJsonMsg(json: String, actorref: ActorRef, streamerDFActor: ActorRef, timeout: Int) = {
def asWsMsgStart(json: String) = json.parseJson.convertTo[WSMsgStart]
def asWsMsgTrade(json: String) = json.parseJson.convertTo[WSMsgTrade]
try {
Try[Any](asWsMsgStart(json)).orElse(
Try(asWsMsgTrade(json))) match {
case Success(req: WSMsgTrade) =>
logger.debug(s"$req")
streamerDFActor ! req
case Success(req: WSMsgStart) =>
//Send a msg to start our subscription here
actorref ! WSMsgSubRequest("SubAdd", Seq("0~Coinbase~BTC~USD", "0~Binance~BTC~USDT", "0~Kraken~BTC~USD", "0~CoinDeal~BTC~USD", "0~Gemini~BTC~USD")).toJson.prettyPrint
case Success(x) =>
throw new IllegalArgumentException(s"Unknown request type: $x")
case Failure(e) =>
throw e
}
} catch {
case e: JsonParser.ParsingException =>
logger.warn(s"Handled invalid message $json", e.summary)
}
}
Spark structured streaming is started with format: “kafka” to start consuming the TradeMsgs from the kafka topic.
val kafkamsgstream= spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkabootstrap)
.option("subscribe", kafkatopic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) AS wstradejson")
.select(from_json($"wstradejson", wstradeschema).as("wstrade")) // composite column (struct)
.selectExpr("wstrade.*").as[WSMsgTrade]
Aggregation transformations are performed on this Dataset that is obtained using flatMapGroupWithState function to drop stale records by the eventtime on the TradeMsg received.
val groupeddf = filtereddf
.groupByKey(t => (t.market, t.fromcoin, t.direction, t.window))
.flatMapGroupsWithState(OutputMode.Update(), GroupStateTimeout.EventTimeTimeout())(avgTradeMsgWithWatermarkFn)
def avgTradeMsgWithWatermarkFn(key: (String, String, String, (Timestamp, Timestamp)),
values: Iterator[TradeMsg], state: GroupState[List[TradeMsg]]) : Iterator[TradeMsgAvgByWindowPeriod] = {
if (state.hasTimedOut) {
state.remove()
Iterator()
} else {
val groups = values.to[collection.immutable.Seq]
val previous =
if(state.exists) state.get
else List()
val updatedstate = groups.foldLeft(previous) {
(current, record) => current :+ record
}
state.update(updatedstate)
state.setTimeoutTimestamp(state.getCurrentWatermarkMs(), "5 minutes")
stateToAverageEvent(key, state.get)
}
}
Finally, the output is written to a Cassandra table as shown below
val query = windowperiod.writeStream
.option("checkpointLocation", "checkpoint-kafka-cassandra")
.foreachBatch((batch, batchid) => {
batch.withColumn("uuid", makeuuids()).write
.option("spark.cassandra.connection.host", cassandraurl)
.cassandraFormat("trademsgs1minutewindow", "cryptocompare")
.mode(SaveMode.Append)
.save()
})
.outputMode("update")
.trigger(Trigger.ProcessingTime(15.seconds))
.start()
In this post, we have discussed how to process streaming data using Apache Spark from a Kafka broker with messages being streamed into it through a WebSocket channel.
As mentioned earlier, the CryptoCompare site requires that when a WebSocket connection is initiated, the client must send a request detailing what type of messages it is interested in. Care must be taken when sending this message not to close this connection when successful as the socket stream is made use of to further stream the Trade/ Ticker messages. When the client wishes to stop the messages from flowing in, then a poison message must be sent to stop the stream and eventually close the connection.
Another point to take note of is the event time in the Trade messages received. It is a good idea to filter out/drop stale messages from our aggregation function which is determined by the event time property on the Trade Msg received. We make use of the watermark function in spark structured streaming to drop too late data. To gain better control of when and how to drop this “too late data”, we further make use of the flatMapGroupWithState function to perform stateful streaming.
Finally, data aggregated is written to a Cassandra table using the forEachBatch function which sends batched data to the table.
Challenges experienced while performing the demo included knowing how to stop both the WebSocket and the spark streaming query after a certain timeout. This was handled by calling the ActorSystem scheduler scheduleonce method on when the timeout should occur. Once the timeout period elapses, then a message is sent to stop the WebSocket from receiving further messages. Since the Spark streaming query is the last daemon thread still running and there are no more threads running, the application gracefully stops.
Another challenge is remembering to clear out the checkpoint directory every time you make changes to the transformation query in between the starts and stops of the streaming query. If you don’t clear this folder, you will run into errors because spark attempts to remember the last timestamp of streaming messages before the messages stopped flowing. It saves the current transformation in this checkpoint folder and as such, any changes to the code will render this state invalid.
After performing the demo using Spark, Kafka, Akka Websockets, and the Cryptocompare API, we see how easy it is to combine all the aforementioned stacks to process live streaming messages from a Websocket using Spark and pushed to a Cassandra table. For future considerations, we could set up a time series live graph using Grafana to display the records stored in the Cassandra table.
Does this stack make sense? Could you apply it to something else? Do you think there are any problems with it? Please feel free to share your thoughts below.
Cassandra.Link is a knowledge base that we created for all things Apache Cassandra. Our goal with Cassandra.Link was to not only fill the gap of Planet Cassandra but to bring the Cassandra community together. Feel free to reach out if you wish to collaborate with us on this project in any capacity.
We are a technology company that specializes in building business platforms. If you have any questions about the tools discussed in this post or about any of our services, feel free to send us an email!
Subscribe to our monthly newsletter below and never miss the latest Cassandra and data engineering news!