SlideShare a Scribd company logo
Streaming Data with
scalaz-stream
Gary Coady
gcoady@gilt.com
• Why do we want streaming APIs?
• Introduction to scalaz-stream
• Use case: Server-Sent Events implementation
Contents
Why do we want
streaming APIs?
Information with
Indeterminate/unbounded size
• Lines from a text file
• Bytes from a binary file
• Chunks of data from a TCP connection
• TCP connections
• Data from Kinesis or SQS or SNS or Kafka or…
• Data from an API with paged implementation
“Dangerous” Choices
• scala.collection.Iterable

Provides an iterator to step through items in
sequence
• scala.collection.immutable.Stream

Lazily evaluated, possibly infinite list of values
Do The Right Thing
• Safe setup and cleanup
• Constant memory usage
• Constant stack usage
• Refactor with confidence
• Composable
• Back-pressure
• Creates co-data
• Safe resource management
• Referential transparency
• Controlled asynchronous effects
What is scalaz-stream
User
code
Process
.await
“Waiting” for
callback
User
code
Callback
sealed	
  trait	
  Process[+F[_],	
  +O]
Effect
Output
case	
  class	
  Halt(cause:	
  Cause)	
  extends	
  Process[Nothing,	
  Nothing]
case	
  class	
  Emit[+O](seq:	
  Seq[O])	
  extends	
  Process[Nothing,	
  O]
case	
  class	
  Await[+F[_],	
  A,	
  +O](

	
  	
  req:	
  F[A],

	
  	
  rcv:	
  (EarlyCause	
  /	
  A)	
  =>	
  Process[F,	
  O]	
  
)	
  extends	
  Process[F,	
  O]
Composition Options
Process1[I,	
  O]	
  
	
  -­‐	
  Stateful	
  transducer,	
  converts	
  I	
  =>	
  O	
  (with	
  state)	
  
	
  -­‐	
  Combine	
  with	
  “pipe”	
  
Channel[F[_],	
  I,	
  O]	
  
	
  -­‐	
  Takes	
  I	
  values,	
  runs	
  function	
  I	
  =>	
  F[O]	
  
	
  -­‐	
  Combine	
  with	
  “through”	
  or	
  “observe”.	
  
Sink[F[_],	
  I]	
  
	
  -­‐	
  Takes	
  I	
  values,	
  runs	
  function	
  I	
  =>	
  F[Unit]	
  
	
  -­‐	
  Add	
  with	
  “to”.
Implementing
Server-sent Events (SSE)
This specification defines an API for
opening an HTTP connection for
receiving push notifications from a
server in the form of DOM events.
case	
  class	
  SSEEvent(eventName:	
  Option[String],	
  data:	
  String)
data:	
  This	
  is	
  the	
  first	
  message.	
  
data:	
  This	
  is	
  the	
  second	
  message,	
  it	
  
data:	
  has	
  two	
  lines.	
  
data:	
  This	
  is	
  the	
  third	
  message.	
  
event:	
  add	
  
data:	
  73857293	
  
event:	
  remove	
  
data:	
  2153	
  
event:	
  add	
  
data:	
  113411
Example streams
We want this type:



Process[Task,	
  SSEEvent]
“A potentially infinite stream of SSE event messages”
async.boundedQueue[A]
• Items added to queue are removed in same order
• Connect different asynchronous domains
• Methods:



def	
  enqueueOne(a:	
  A):	
  Task[Unit]



def	
  dequeue:	
  Process[Task,	
  A]
HTTP Client
Implementation
• Use Apache AsyncHTTPClient
• Hook into onBodyPartReceived callback
• Use async.boundedQueue to convert chunks into
stream
def	
  httpRequest(client:	
  AsyncHttpClient,	
  url:	
  String):	
  
	
  	
  	
  Process[Task,	
  ByteVector]	
  =	
  {	
  
	
  	
  val	
  contentQueue	
  =	
  async.boundedQueue[ByteVector](10)	
  
	
  	
  val	
  req	
  =	
  client.prepareGet(url)	
  
	
  	
  req.execute(new	
  AsyncCompletionHandler[Unit]	
  {

	
  	
  	
  	
  override	
  def	
  onBodyPartReceived(content:	
  HttpResponseBodyPart)	
  =	
  {

	
  	
  	
  	
  	
  	
  contentQueue.enqueueOne(	
  
	
  	
  	
  	
  	
  	
  	
  	
  ByteVector(content.getBodyByteBuffer)	
  
	
  	
  	
  	
  	
  	
  ).run	
  
	
  	
  	
  	
  	
  	
  super.onBodyPartReceived(content)

	
  	
  	
  	
  }

	
  	
  })

	
  	
  contentQueue.dequeue	
  
}
How to terminate
stream?
req.execute(new	
  AsyncCompletionHandler[Unit]	
  {	
  
	
  	
  ...	
  
	
  	
  override	
  def	
  onCompleted(r:	
  Response):	
  Unit	
  =	
  {

	
  	
  	
  	
  logger.debug("Request	
  completed")

	
  	
  	
  	
  contentQueue.close.run

	
  	
  }	
  
	
  	
  ...	
  
}
How to terminate
stream with errors?
req.execute(new	
  AsyncCompletionHandler[Unit]	
  {	
  
	
  	
  ...	
  
	
  	
  override	
  def	
  onThrowable(t:	
  Throwable):	
  Unit	
  =	
  {

	
  	
  	
  	
  logger.debug("Request	
  failed	
  with	
  error",	
  t)

	
  	
  	
  	
  contentQueue.fail(t).run

	
  	
  }	
  
	
  	
  ...	
  
}
Process[Task, ByteVector]
Process[Task, SSEEvent]
Process[Task, Underpants]
Step 1
Step 2
Step 3
• Split at line endings
• Convert ByteVector into UTF-8 Strings
• Partition by SSE “tag” (“data”, “id”, “event”, …)
• Emit accumulated SSE data when blank line found
• Split at line endings



ByteVector	
  =>	
  Seq[ByteVector]
• Convert ByteVector into UTF-8 Strings



ByteVector	
  =>	
  String
• Partition by SSE “tag” (“data”, “id”, “event”, …)



String	
  =>	
  SSEMessage
• Emit accumulated SSE data when blank line found



SSEMessage	
  =>	
  SSEEvent
Handling Network Errors
• If a network error occurs:
• Sleep a while
• Set up the connection again and keep going
• Append the same Process definition again!
def	
  sseStream:	
  Process[Task,	
  SSEEvent]	
  =	
  {	
  
	
  	
  httpRequest(client,	
  url)	
  
	
  	
  	
  	
  .pipe(splitLines)	
  
	
  	
  	
  	
  .pipe(emitMessages)	
  
	
  	
  	
  	
  .pipe(emitEvents)	
  
	
  	
  	
  	
  .partialAttempt	
  {	
  
	
  	
  	
  	
  	
  	
  case	
  e:	
  ConnectException	
  =>	
  retryRequest	
  
	
  	
  	
  	
  	
  	
  case	
  e:	
  TimeoutException	
  =>	
  retryRequest	
  
	
  	
  	
  	
  }	
  
	
  	
  	
  	
  .map(_.merge)	
  
}	
  
def	
  retryRequest:	
  Process[Task,	
  SSEEvent]	
  =	
  {	
  
	
  	
  time.sleep(retryTime)	
  ++	
  sseStream	
  
}
Usage
sseStream(client,	
  url)	
  pipe	
  jsonToString	
  to	
  io.stdOutLines
Questions?
Ad

More Related Content

What's hot (20)

Introduction to Structured Streaming
Introduction to Structured StreamingIntroduction to Structured Streaming
Introduction to Structured Streaming
Knoldus Inc.
 
Javantura v3 - Logs – the missing gold mine – Franjo Žilić
Javantura v3 - Logs – the missing gold mine – Franjo ŽilićJavantura v3 - Logs – the missing gold mine – Franjo Žilić
Javantura v3 - Logs – the missing gold mine – Franjo Žilić
HUJAK - Hrvatska udruga Java korisnika / Croatian Java User Association
 
Spark streaming: Best Practices
Spark streaming: Best PracticesSpark streaming: Best Practices
Spark streaming: Best Practices
Prakash Chockalingam
 
Distributed Real-Time Stream Processing: Why and How 2.0
Distributed Real-Time Stream Processing:  Why and How 2.0Distributed Real-Time Stream Processing:  Why and How 2.0
Distributed Real-Time Stream Processing: Why and How 2.0
Petr Zapletal
 
Building Scalable and Extendable Data Pipeline for Call of Duty Games: Lesson...
Building Scalable and Extendable Data Pipeline for Call of Duty Games: Lesson...Building Scalable and Extendable Data Pipeline for Call of Duty Games: Lesson...
Building Scalable and Extendable Data Pipeline for Call of Duty Games: Lesson...
Yaroslav Tkachenko
 
Multi dimension aggregations using spark and dataframes
Multi dimension aggregations using spark and dataframesMulti dimension aggregations using spark and dataframes
Multi dimension aggregations using spark and dataframes
Romi Kuntsman
 
ADO.NETObjects
ADO.NETObjectsADO.NETObjects
ADO.NETObjects
Wings Interactive
 
"How about no grep and zabbix?". ELK based alerts and metrics.
"How about no grep and zabbix?". ELK based alerts and metrics."How about no grep and zabbix?". ELK based alerts and metrics.
"How about no grep and zabbix?". ELK based alerts and metrics.
Vladimir Pavkin
 
Stream processing - Apache flink
Stream processing - Apache flinkStream processing - Apache flink
Stream processing - Apache flink
Renato Guimaraes
 
Arbitrary Stateful Aggregations using Structured Streaming in Apache Spark
Arbitrary Stateful Aggregations using Structured Streaming in Apache SparkArbitrary Stateful Aggregations using Structured Streaming in Apache Spark
Arbitrary Stateful Aggregations using Structured Streaming in Apache Spark
Databricks
 
Flink Forward SF 2017: David Hardwick, Sean Hester & David Brelloch - Dynami...
Flink Forward SF 2017: David Hardwick, Sean Hester & David Brelloch -  Dynami...Flink Forward SF 2017: David Hardwick, Sean Hester & David Brelloch -  Dynami...
Flink Forward SF 2017: David Hardwick, Sean Hester & David Brelloch - Dynami...
Flink Forward
 
Introduction to Akka-Streams
Introduction to Akka-StreamsIntroduction to Akka-Streams
Introduction to Akka-Streams
dmantula
 
RMLL 2014 - LDAP Synchronization Connector
RMLL 2014 - LDAP Synchronization ConnectorRMLL 2014 - LDAP Synchronization Connector
RMLL 2014 - LDAP Synchronization Connector
Clément OUDOT
 
Introduction of Blockchain @ Airtel Payment Bank
Introduction of Blockchain @ Airtel Payment BankIntroduction of Blockchain @ Airtel Payment Bank
Introduction of Blockchain @ Airtel Payment Bank
Rajesh Kumar
 
Developing a Real-time Engine with Akka, Cassandra, and Spray
Developing a Real-time Engine with Akka, Cassandra, and SprayDeveloping a Real-time Engine with Akka, Cassandra, and Spray
Developing a Real-time Engine with Akka, Cassandra, and Spray
Jacob Park
 
Ldap Synchronization Connector @ 2011.RMLL
Ldap Synchronization Connector @ 2011.RMLLLdap Synchronization Connector @ 2011.RMLL
Ldap Synchronization Connector @ 2011.RMLL
sbahloul
 
Akka Microservices Architecture And Design
Akka Microservices Architecture And DesignAkka Microservices Architecture And Design
Akka Microservices Architecture And Design
Yaroslav Tkachenko
 
Distributed Stream Processing - Spark Summit East 2017
Distributed Stream Processing - Spark Summit East 2017Distributed Stream Processing - Spark Summit East 2017
Distributed Stream Processing - Spark Summit East 2017
Petr Zapletal
 
Data Analytics Service Company and Its Ruby Usage
Data Analytics Service Company and Its Ruby UsageData Analytics Service Company and Its Ruby Usage
Data Analytics Service Company and Its Ruby Usage
SATOSHI TAGOMORI
 
Akka streams
Akka streamsAkka streams
Akka streams
Knoldus Inc.
 
Introduction to Structured Streaming
Introduction to Structured StreamingIntroduction to Structured Streaming
Introduction to Structured Streaming
Knoldus Inc.
 
Distributed Real-Time Stream Processing: Why and How 2.0
Distributed Real-Time Stream Processing:  Why and How 2.0Distributed Real-Time Stream Processing:  Why and How 2.0
Distributed Real-Time Stream Processing: Why and How 2.0
Petr Zapletal
 
Building Scalable and Extendable Data Pipeline for Call of Duty Games: Lesson...
Building Scalable and Extendable Data Pipeline for Call of Duty Games: Lesson...Building Scalable and Extendable Data Pipeline for Call of Duty Games: Lesson...
Building Scalable and Extendable Data Pipeline for Call of Duty Games: Lesson...
Yaroslav Tkachenko
 
Multi dimension aggregations using spark and dataframes
Multi dimension aggregations using spark and dataframesMulti dimension aggregations using spark and dataframes
Multi dimension aggregations using spark and dataframes
Romi Kuntsman
 
"How about no grep and zabbix?". ELK based alerts and metrics.
"How about no grep and zabbix?". ELK based alerts and metrics."How about no grep and zabbix?". ELK based alerts and metrics.
"How about no grep and zabbix?". ELK based alerts and metrics.
Vladimir Pavkin
 
Stream processing - Apache flink
Stream processing - Apache flinkStream processing - Apache flink
Stream processing - Apache flink
Renato Guimaraes
 
Arbitrary Stateful Aggregations using Structured Streaming in Apache Spark
Arbitrary Stateful Aggregations using Structured Streaming in Apache SparkArbitrary Stateful Aggregations using Structured Streaming in Apache Spark
Arbitrary Stateful Aggregations using Structured Streaming in Apache Spark
Databricks
 
Flink Forward SF 2017: David Hardwick, Sean Hester & David Brelloch - Dynami...
Flink Forward SF 2017: David Hardwick, Sean Hester & David Brelloch -  Dynami...Flink Forward SF 2017: David Hardwick, Sean Hester & David Brelloch -  Dynami...
Flink Forward SF 2017: David Hardwick, Sean Hester & David Brelloch - Dynami...
Flink Forward
 
Introduction to Akka-Streams
Introduction to Akka-StreamsIntroduction to Akka-Streams
Introduction to Akka-Streams
dmantula
 
RMLL 2014 - LDAP Synchronization Connector
RMLL 2014 - LDAP Synchronization ConnectorRMLL 2014 - LDAP Synchronization Connector
RMLL 2014 - LDAP Synchronization Connector
Clément OUDOT
 
Introduction of Blockchain @ Airtel Payment Bank
Introduction of Blockchain @ Airtel Payment BankIntroduction of Blockchain @ Airtel Payment Bank
Introduction of Blockchain @ Airtel Payment Bank
Rajesh Kumar
 
Developing a Real-time Engine with Akka, Cassandra, and Spray
Developing a Real-time Engine with Akka, Cassandra, and SprayDeveloping a Real-time Engine with Akka, Cassandra, and Spray
Developing a Real-time Engine with Akka, Cassandra, and Spray
Jacob Park
 
Ldap Synchronization Connector @ 2011.RMLL
Ldap Synchronization Connector @ 2011.RMLLLdap Synchronization Connector @ 2011.RMLL
Ldap Synchronization Connector @ 2011.RMLL
sbahloul
 
Akka Microservices Architecture And Design
Akka Microservices Architecture And DesignAkka Microservices Architecture And Design
Akka Microservices Architecture And Design
Yaroslav Tkachenko
 
Distributed Stream Processing - Spark Summit East 2017
Distributed Stream Processing - Spark Summit East 2017Distributed Stream Processing - Spark Summit East 2017
Distributed Stream Processing - Spark Summit East 2017
Petr Zapletal
 
Data Analytics Service Company and Its Ruby Usage
Data Analytics Service Company and Its Ruby UsageData Analytics Service Company and Its Ruby Usage
Data Analytics Service Company and Its Ruby Usage
SATOSHI TAGOMORI
 

Viewers also liked (14)

Unsucking Error Handling with Futures
Unsucking Error Handling with FuturesUnsucking Error Handling with Futures
Unsucking Error Handling with Futures
GaryCoady
 
Contents page analysis
Contents page analysisContents page analysis
Contents page analysis
andreidanca
 
Wilmer2015.01.30
Wilmer2015.01.30Wilmer2015.01.30
Wilmer2015.01.30
lance lloren
 
Http4s, Doobie and Circe: The Functional Web Stack
Http4s, Doobie and Circe: The Functional Web StackHttp4s, Doobie and Circe: The Functional Web Stack
Http4s, Doobie and Circe: The Functional Web Stack
GaryCoady
 
AMR Medicion de Agua Potable "Medidores Ultrasonicos"
AMR Medicion de Agua Potable "Medidores Ultrasonicos"AMR Medicion de Agua Potable "Medidores Ultrasonicos"
AMR Medicion de Agua Potable "Medidores Ultrasonicos"
Wilmer Troconis
 
Unit 3 - Egyptian art and architecture
Unit 3 - Egyptian art and architectureUnit 3 - Egyptian art and architecture
Unit 3 - Egyptian art and architecture
JaimeAlonsoEdu
 
Custom deployments with sbt-native-packager
Custom deployments with sbt-native-packagerCustom deployments with sbt-native-packager
Custom deployments with sbt-native-packager
GaryCoady
 
Puntos mes de Septiembre
Puntos mes de SeptiembrePuntos mes de Septiembre
Puntos mes de Septiembre
RPCard
 
Amber CV Feb 2017
Amber CV Feb 2017Amber CV Feb 2017
Amber CV Feb 2017
Amber Leis
 
Wilmer CV
Wilmer CVWilmer CV
Wilmer CV
lance lloren
 
Thrust Bearing
Thrust BearingThrust Bearing
Thrust Bearing
alexcostea
 
Unit 1- Carolingian art
Unit 1- Carolingian artUnit 1- Carolingian art
Unit 1- Carolingian art
JaimeAlonsoEdu
 
Unit 3 - Romanesque art
Unit 3 - Romanesque artUnit 3 - Romanesque art
Unit 3 - Romanesque art
JaimeAlonsoEdu
 
Unit 2 - Islamic art
Unit 2 - Islamic artUnit 2 - Islamic art
Unit 2 - Islamic art
JaimeAlonsoEdu
 
Unsucking Error Handling with Futures
Unsucking Error Handling with FuturesUnsucking Error Handling with Futures
Unsucking Error Handling with Futures
GaryCoady
 
Contents page analysis
Contents page analysisContents page analysis
Contents page analysis
andreidanca
 
Http4s, Doobie and Circe: The Functional Web Stack
Http4s, Doobie and Circe: The Functional Web StackHttp4s, Doobie and Circe: The Functional Web Stack
Http4s, Doobie and Circe: The Functional Web Stack
GaryCoady
 
AMR Medicion de Agua Potable "Medidores Ultrasonicos"
AMR Medicion de Agua Potable "Medidores Ultrasonicos"AMR Medicion de Agua Potable "Medidores Ultrasonicos"
AMR Medicion de Agua Potable "Medidores Ultrasonicos"
Wilmer Troconis
 
Unit 3 - Egyptian art and architecture
Unit 3 - Egyptian art and architectureUnit 3 - Egyptian art and architecture
Unit 3 - Egyptian art and architecture
JaimeAlonsoEdu
 
Custom deployments with sbt-native-packager
Custom deployments with sbt-native-packagerCustom deployments with sbt-native-packager
Custom deployments with sbt-native-packager
GaryCoady
 
Puntos mes de Septiembre
Puntos mes de SeptiembrePuntos mes de Septiembre
Puntos mes de Septiembre
RPCard
 
Amber CV Feb 2017
Amber CV Feb 2017Amber CV Feb 2017
Amber CV Feb 2017
Amber Leis
 
Thrust Bearing
Thrust BearingThrust Bearing
Thrust Bearing
alexcostea
 
Unit 1- Carolingian art
Unit 1- Carolingian artUnit 1- Carolingian art
Unit 1- Carolingian art
JaimeAlonsoEdu
 
Unit 3 - Romanesque art
Unit 3 - Romanesque artUnit 3 - Romanesque art
Unit 3 - Romanesque art
JaimeAlonsoEdu
 
Ad

Similar to Streaming Data with scalaz-stream (20)

Building Eventing Systems for Microservice Architecture
Building Eventing Systems for Microservice Architecture  Building Eventing Systems for Microservice Architecture
Building Eventing Systems for Microservice Architecture
Yaroslav Tkachenko
 
Actors or Not: Async Event Architectures
Actors or Not: Async Event ArchitecturesActors or Not: Async Event Architectures
Actors or Not: Async Event Architectures
Yaroslav Tkachenko
 
Socket Programming in Java.ppt yeh haii
Socket Programming in Java.ppt  yeh haiiSocket Programming in Java.ppt  yeh haii
Socket Programming in Java.ppt yeh haii
inambscs4508
 
Parallel Processing
Parallel ProcessingParallel Processing
Parallel Processing
RTigger
 
Working with data using Azure Functions.pdf
Working with data using Azure Functions.pdfWorking with data using Azure Functions.pdf
Working with data using Azure Functions.pdf
Stephanie Locke
 
Windows 8 Apps and the Outside World
Windows 8 Apps and the Outside WorldWindows 8 Apps and the Outside World
Windows 8 Apps and the Outside World
Microsoft Developer Network (MSDN) - Belgium and Luxembourg
 
Building Continuous Application with Structured Streaming and Real-Time Data ...
Building Continuous Application with Structured Streaming and Real-Time Data ...Building Continuous Application with Structured Streaming and Real-Time Data ...
Building Continuous Application with Structured Streaming and Real-Time Data ...
Databricks
 
Streaming Operational Data with MariaDB MaxScale
Streaming Operational Data with MariaDB MaxScaleStreaming Operational Data with MariaDB MaxScale
Streaming Operational Data with MariaDB MaxScale
MariaDB plc
 
Apache Flink @ Tel Aviv / Herzliya Meetup
Apache Flink @ Tel Aviv / Herzliya MeetupApache Flink @ Tel Aviv / Herzliya Meetup
Apache Flink @ Tel Aviv / Herzliya Meetup
Robert Metzger
 
How we evolved data pipeline at Celtra and what we learned along the way
How we evolved data pipeline at Celtra and what we learned along the wayHow we evolved data pipeline at Celtra and what we learned along the way
How we evolved data pipeline at Celtra and what we learned along the way
Grega Kespret
 
Using akka streams to access s3 objects
Using akka streams to access s3 objectsUsing akka streams to access s3 objects
Using akka streams to access s3 objects
Mikhail Girkin
 
DjangoCon 2010 Scaling Disqus
DjangoCon 2010 Scaling DisqusDjangoCon 2010 Scaling Disqus
DjangoCon 2010 Scaling Disqus
zeeg
 
Marmagna desai
Marmagna desaiMarmagna desai
Marmagna desai
jmsthakur
 
Stream and Batch Processing in the Cloud with Data Microservices
Stream and Batch Processing in the Cloud with Data MicroservicesStream and Batch Processing in the Cloud with Data Microservices
Stream and Batch Processing in the Cloud with Data Microservices
marius_bogoevici
 
Nyc big datagenomics-pizarroa-sept2017
Nyc big datagenomics-pizarroa-sept2017Nyc big datagenomics-pizarroa-sept2017
Nyc big datagenomics-pizarroa-sept2017
delagoya
 
Boundary Front end tech talk: how it works
Boundary Front end tech talk: how it worksBoundary Front end tech talk: how it works
Boundary Front end tech talk: how it works
Boundary
 
Kafka Summit SF 2017 - Kafka Stream Processing for Everyone with KSQL
Kafka Summit SF 2017 - Kafka Stream Processing for Everyone with KSQLKafka Summit SF 2017 - Kafka Stream Processing for Everyone with KSQL
Kafka Summit SF 2017 - Kafka Stream Processing for Everyone with KSQL
confluent
 
Scaling asp.net websites to millions of users
Scaling asp.net websites to millions of usersScaling asp.net websites to millions of users
Scaling asp.net websites to millions of users
oazabir
 
AWS IoT Deep Dive
AWS IoT Deep DiveAWS IoT Deep Dive
AWS IoT Deep Dive
Kristana Kane
 
Database Connectivity using Python and MySQL
Database Connectivity using Python and MySQLDatabase Connectivity using Python and MySQL
Database Connectivity using Python and MySQL
devsuchaye
 
Building Eventing Systems for Microservice Architecture
Building Eventing Systems for Microservice Architecture  Building Eventing Systems for Microservice Architecture
Building Eventing Systems for Microservice Architecture
Yaroslav Tkachenko
 
Actors or Not: Async Event Architectures
Actors or Not: Async Event ArchitecturesActors or Not: Async Event Architectures
Actors or Not: Async Event Architectures
Yaroslav Tkachenko
 
Socket Programming in Java.ppt yeh haii
Socket Programming in Java.ppt  yeh haiiSocket Programming in Java.ppt  yeh haii
Socket Programming in Java.ppt yeh haii
inambscs4508
 
Parallel Processing
Parallel ProcessingParallel Processing
Parallel Processing
RTigger
 
Working with data using Azure Functions.pdf
Working with data using Azure Functions.pdfWorking with data using Azure Functions.pdf
Working with data using Azure Functions.pdf
Stephanie Locke
 
Building Continuous Application with Structured Streaming and Real-Time Data ...
Building Continuous Application with Structured Streaming and Real-Time Data ...Building Continuous Application with Structured Streaming and Real-Time Data ...
Building Continuous Application with Structured Streaming and Real-Time Data ...
Databricks
 
Streaming Operational Data with MariaDB MaxScale
Streaming Operational Data with MariaDB MaxScaleStreaming Operational Data with MariaDB MaxScale
Streaming Operational Data with MariaDB MaxScale
MariaDB plc
 
Apache Flink @ Tel Aviv / Herzliya Meetup
Apache Flink @ Tel Aviv / Herzliya MeetupApache Flink @ Tel Aviv / Herzliya Meetup
Apache Flink @ Tel Aviv / Herzliya Meetup
Robert Metzger
 
How we evolved data pipeline at Celtra and what we learned along the way
How we evolved data pipeline at Celtra and what we learned along the wayHow we evolved data pipeline at Celtra and what we learned along the way
How we evolved data pipeline at Celtra and what we learned along the way
Grega Kespret
 
Using akka streams to access s3 objects
Using akka streams to access s3 objectsUsing akka streams to access s3 objects
Using akka streams to access s3 objects
Mikhail Girkin
 
DjangoCon 2010 Scaling Disqus
DjangoCon 2010 Scaling DisqusDjangoCon 2010 Scaling Disqus
DjangoCon 2010 Scaling Disqus
zeeg
 
Marmagna desai
Marmagna desaiMarmagna desai
Marmagna desai
jmsthakur
 
Stream and Batch Processing in the Cloud with Data Microservices
Stream and Batch Processing in the Cloud with Data MicroservicesStream and Batch Processing in the Cloud with Data Microservices
Stream and Batch Processing in the Cloud with Data Microservices
marius_bogoevici
 
Nyc big datagenomics-pizarroa-sept2017
Nyc big datagenomics-pizarroa-sept2017Nyc big datagenomics-pizarroa-sept2017
Nyc big datagenomics-pizarroa-sept2017
delagoya
 
Boundary Front end tech talk: how it works
Boundary Front end tech talk: how it worksBoundary Front end tech talk: how it works
Boundary Front end tech talk: how it works
Boundary
 
Kafka Summit SF 2017 - Kafka Stream Processing for Everyone with KSQL
Kafka Summit SF 2017 - Kafka Stream Processing for Everyone with KSQLKafka Summit SF 2017 - Kafka Stream Processing for Everyone with KSQL
Kafka Summit SF 2017 - Kafka Stream Processing for Everyone with KSQL
confluent
 
Scaling asp.net websites to millions of users
Scaling asp.net websites to millions of usersScaling asp.net websites to millions of users
Scaling asp.net websites to millions of users
oazabir
 
Database Connectivity using Python and MySQL
Database Connectivity using Python and MySQLDatabase Connectivity using Python and MySQL
Database Connectivity using Python and MySQL
devsuchaye
 
Ad

Recently uploaded (20)

Artificial_Intelligence_in_Everyday_Life.pptx
Artificial_Intelligence_in_Everyday_Life.pptxArtificial_Intelligence_in_Everyday_Life.pptx
Artificial_Intelligence_in_Everyday_Life.pptx
03ANMOLCHAURASIYA
 
Build With AI - In Person Session Slides.pdf
Build With AI - In Person Session Slides.pdfBuild With AI - In Person Session Slides.pdf
Build With AI - In Person Session Slides.pdf
Google Developer Group - Harare
 
Design pattern talk by Kaya Weers - 2025 (v2)
Design pattern talk by Kaya Weers - 2025 (v2)Design pattern talk by Kaya Weers - 2025 (v2)
Design pattern talk by Kaya Weers - 2025 (v2)
Kaya Weers
 
DevOpsDays SLC - Platform Engineers are Product Managers.pptx
DevOpsDays SLC - Platform Engineers are Product Managers.pptxDevOpsDays SLC - Platform Engineers are Product Managers.pptx
DevOpsDays SLC - Platform Engineers are Product Managers.pptx
Justin Reock
 
Cybersecurity Threat Vectors and Mitigation
Cybersecurity Threat Vectors and MitigationCybersecurity Threat Vectors and Mitigation
Cybersecurity Threat Vectors and Mitigation
VICTOR MAESTRE RAMIREZ
 
Could Virtual Threads cast away the usage of Kotlin Coroutines - DevoxxUK2025
Could Virtual Threads cast away the usage of Kotlin Coroutines - DevoxxUK2025Could Virtual Threads cast away the usage of Kotlin Coroutines - DevoxxUK2025
Could Virtual Threads cast away the usage of Kotlin Coroutines - DevoxxUK2025
João Esperancinha
 
How to Install & Activate ListGrabber - eGrabber
How to Install & Activate ListGrabber - eGrabberHow to Install & Activate ListGrabber - eGrabber
How to Install & Activate ListGrabber - eGrabber
eGrabber
 
Q1 2025 Dropbox Earnings and Investor Presentation
Q1 2025 Dropbox Earnings and Investor PresentationQ1 2025 Dropbox Earnings and Investor Presentation
Q1 2025 Dropbox Earnings and Investor Presentation
Dropbox
 
Bepents tech services - a premier cybersecurity consulting firm
Bepents tech services - a premier cybersecurity consulting firmBepents tech services - a premier cybersecurity consulting firm
Bepents tech services - a premier cybersecurity consulting firm
Benard76
 
AsyncAPI v3 : Streamlining Event-Driven API Design
AsyncAPI v3 : Streamlining Event-Driven API DesignAsyncAPI v3 : Streamlining Event-Driven API Design
AsyncAPI v3 : Streamlining Event-Driven API Design
leonid54
 
Dark Dynamism: drones, dark factories and deurbanization
Dark Dynamism: drones, dark factories and deurbanizationDark Dynamism: drones, dark factories and deurbanization
Dark Dynamism: drones, dark factories and deurbanization
Jakub Šimek
 
Mastering Testing in the Modern F&B Landscape
Mastering Testing in the Modern F&B LandscapeMastering Testing in the Modern F&B Landscape
Mastering Testing in the Modern F&B Landscape
marketing943205
 
An Overview of Salesforce Health Cloud & How is it Transforming Patient Care
An Overview of Salesforce Health Cloud & How is it Transforming Patient CareAn Overview of Salesforce Health Cloud & How is it Transforming Patient Care
An Overview of Salesforce Health Cloud & How is it Transforming Patient Care
Cyntexa
 
Unlocking Generative AI in your Web Apps
Unlocking Generative AI in your Web AppsUnlocking Generative AI in your Web Apps
Unlocking Generative AI in your Web Apps
Maximiliano Firtman
 
Zilliz Cloud Monthly Technical Review: May 2025
Zilliz Cloud Monthly Technical Review: May 2025Zilliz Cloud Monthly Technical Review: May 2025
Zilliz Cloud Monthly Technical Review: May 2025
Zilliz
 
AI x Accessibility UXPA by Stew Smith and Olivier Vroom
AI x Accessibility UXPA by Stew Smith and Olivier VroomAI x Accessibility UXPA by Stew Smith and Olivier Vroom
AI x Accessibility UXPA by Stew Smith and Olivier Vroom
UXPA Boston
 
Integrating FME with Python: Tips, Demos, and Best Practices for Powerful Aut...
Integrating FME with Python: Tips, Demos, and Best Practices for Powerful Aut...Integrating FME with Python: Tips, Demos, and Best Practices for Powerful Aut...
Integrating FME with Python: Tips, Demos, and Best Practices for Powerful Aut...
Safe Software
 
Smart Investments Leveraging Agentic AI for Real Estate Success.pptx
Smart Investments Leveraging Agentic AI for Real Estate Success.pptxSmart Investments Leveraging Agentic AI for Real Estate Success.pptx
Smart Investments Leveraging Agentic AI for Real Estate Success.pptx
Seasia Infotech
 
Enterprise Integration Is Dead! Long Live AI-Driven Integration with Apache C...
Enterprise Integration Is Dead! Long Live AI-Driven Integration with Apache C...Enterprise Integration Is Dead! Long Live AI-Driven Integration with Apache C...
Enterprise Integration Is Dead! Long Live AI-Driven Integration with Apache C...
Markus Eisele
 
The No-Code Way to Build a Marketing Team with One AI Agent (Download the n8n...
The No-Code Way to Build a Marketing Team with One AI Agent (Download the n8n...The No-Code Way to Build a Marketing Team with One AI Agent (Download the n8n...
The No-Code Way to Build a Marketing Team with One AI Agent (Download the n8n...
SOFTTECHHUB
 
Artificial_Intelligence_in_Everyday_Life.pptx
Artificial_Intelligence_in_Everyday_Life.pptxArtificial_Intelligence_in_Everyday_Life.pptx
Artificial_Intelligence_in_Everyday_Life.pptx
03ANMOLCHAURASIYA
 
Design pattern talk by Kaya Weers - 2025 (v2)
Design pattern talk by Kaya Weers - 2025 (v2)Design pattern talk by Kaya Weers - 2025 (v2)
Design pattern talk by Kaya Weers - 2025 (v2)
Kaya Weers
 
DevOpsDays SLC - Platform Engineers are Product Managers.pptx
DevOpsDays SLC - Platform Engineers are Product Managers.pptxDevOpsDays SLC - Platform Engineers are Product Managers.pptx
DevOpsDays SLC - Platform Engineers are Product Managers.pptx
Justin Reock
 
Cybersecurity Threat Vectors and Mitigation
Cybersecurity Threat Vectors and MitigationCybersecurity Threat Vectors and Mitigation
Cybersecurity Threat Vectors and Mitigation
VICTOR MAESTRE RAMIREZ
 
Could Virtual Threads cast away the usage of Kotlin Coroutines - DevoxxUK2025
Could Virtual Threads cast away the usage of Kotlin Coroutines - DevoxxUK2025Could Virtual Threads cast away the usage of Kotlin Coroutines - DevoxxUK2025
Could Virtual Threads cast away the usage of Kotlin Coroutines - DevoxxUK2025
João Esperancinha
 
How to Install & Activate ListGrabber - eGrabber
How to Install & Activate ListGrabber - eGrabberHow to Install & Activate ListGrabber - eGrabber
How to Install & Activate ListGrabber - eGrabber
eGrabber
 
Q1 2025 Dropbox Earnings and Investor Presentation
Q1 2025 Dropbox Earnings and Investor PresentationQ1 2025 Dropbox Earnings and Investor Presentation
Q1 2025 Dropbox Earnings and Investor Presentation
Dropbox
 
Bepents tech services - a premier cybersecurity consulting firm
Bepents tech services - a premier cybersecurity consulting firmBepents tech services - a premier cybersecurity consulting firm
Bepents tech services - a premier cybersecurity consulting firm
Benard76
 
AsyncAPI v3 : Streamlining Event-Driven API Design
AsyncAPI v3 : Streamlining Event-Driven API DesignAsyncAPI v3 : Streamlining Event-Driven API Design
AsyncAPI v3 : Streamlining Event-Driven API Design
leonid54
 
Dark Dynamism: drones, dark factories and deurbanization
Dark Dynamism: drones, dark factories and deurbanizationDark Dynamism: drones, dark factories and deurbanization
Dark Dynamism: drones, dark factories and deurbanization
Jakub Šimek
 
Mastering Testing in the Modern F&B Landscape
Mastering Testing in the Modern F&B LandscapeMastering Testing in the Modern F&B Landscape
Mastering Testing in the Modern F&B Landscape
marketing943205
 
An Overview of Salesforce Health Cloud & How is it Transforming Patient Care
An Overview of Salesforce Health Cloud & How is it Transforming Patient CareAn Overview of Salesforce Health Cloud & How is it Transforming Patient Care
An Overview of Salesforce Health Cloud & How is it Transforming Patient Care
Cyntexa
 
Unlocking Generative AI in your Web Apps
Unlocking Generative AI in your Web AppsUnlocking Generative AI in your Web Apps
Unlocking Generative AI in your Web Apps
Maximiliano Firtman
 
Zilliz Cloud Monthly Technical Review: May 2025
Zilliz Cloud Monthly Technical Review: May 2025Zilliz Cloud Monthly Technical Review: May 2025
Zilliz Cloud Monthly Technical Review: May 2025
Zilliz
 
AI x Accessibility UXPA by Stew Smith and Olivier Vroom
AI x Accessibility UXPA by Stew Smith and Olivier VroomAI x Accessibility UXPA by Stew Smith and Olivier Vroom
AI x Accessibility UXPA by Stew Smith and Olivier Vroom
UXPA Boston
 
Integrating FME with Python: Tips, Demos, and Best Practices for Powerful Aut...
Integrating FME with Python: Tips, Demos, and Best Practices for Powerful Aut...Integrating FME with Python: Tips, Demos, and Best Practices for Powerful Aut...
Integrating FME with Python: Tips, Demos, and Best Practices for Powerful Aut...
Safe Software
 
Smart Investments Leveraging Agentic AI for Real Estate Success.pptx
Smart Investments Leveraging Agentic AI for Real Estate Success.pptxSmart Investments Leveraging Agentic AI for Real Estate Success.pptx
Smart Investments Leveraging Agentic AI for Real Estate Success.pptx
Seasia Infotech
 
Enterprise Integration Is Dead! Long Live AI-Driven Integration with Apache C...
Enterprise Integration Is Dead! Long Live AI-Driven Integration with Apache C...Enterprise Integration Is Dead! Long Live AI-Driven Integration with Apache C...
Enterprise Integration Is Dead! Long Live AI-Driven Integration with Apache C...
Markus Eisele
 
The No-Code Way to Build a Marketing Team with One AI Agent (Download the n8n...
The No-Code Way to Build a Marketing Team with One AI Agent (Download the n8n...The No-Code Way to Build a Marketing Team with One AI Agent (Download the n8n...
The No-Code Way to Build a Marketing Team with One AI Agent (Download the n8n...
SOFTTECHHUB
 

Streaming Data with scalaz-stream

  • 2. • Why do we want streaming APIs? • Introduction to scalaz-stream • Use case: Server-Sent Events implementation Contents
  • 3. Why do we want streaming APIs?
  • 4. Information with Indeterminate/unbounded size • Lines from a text file • Bytes from a binary file • Chunks of data from a TCP connection • TCP connections • Data from Kinesis or SQS or SNS or Kafka or… • Data from an API with paged implementation
  • 5. “Dangerous” Choices • scala.collection.Iterable
 Provides an iterator to step through items in sequence • scala.collection.immutable.Stream
 Lazily evaluated, possibly infinite list of values
  • 6. Do The Right Thing • Safe setup and cleanup • Constant memory usage • Constant stack usage • Refactor with confidence • Composable • Back-pressure
  • 7. • Creates co-data • Safe resource management • Referential transparency • Controlled asynchronous effects What is scalaz-stream
  • 9. sealed  trait  Process[+F[_],  +O] Effect Output
  • 10. case  class  Halt(cause:  Cause)  extends  Process[Nothing,  Nothing]
  • 11. case  class  Emit[+O](seq:  Seq[O])  extends  Process[Nothing,  O]
  • 12. case  class  Await[+F[_],  A,  +O](
    req:  F[A],
    rcv:  (EarlyCause  /  A)  =>  Process[F,  O]   )  extends  Process[F,  O]
  • 13. Composition Options Process1[I,  O]    -­‐  Stateful  transducer,  converts  I  =>  O  (with  state)    -­‐  Combine  with  “pipe”   Channel[F[_],  I,  O]    -­‐  Takes  I  values,  runs  function  I  =>  F[O]    -­‐  Combine  with  “through”  or  “observe”.   Sink[F[_],  I]    -­‐  Takes  I  values,  runs  function  I  =>  F[Unit]    -­‐  Add  with  “to”.
  • 14. Implementing Server-sent Events (SSE) This specification defines an API for opening an HTTP connection for receiving push notifications from a server in the form of DOM events.
  • 15. case  class  SSEEvent(eventName:  Option[String],  data:  String) data:  This  is  the  first  message.   data:  This  is  the  second  message,  it   data:  has  two  lines.   data:  This  is  the  third  message.   event:  add   data:  73857293   event:  remove   data:  2153   event:  add   data:  113411 Example streams
  • 16. We want this type:
 
 Process[Task,  SSEEvent] “A potentially infinite stream of SSE event messages”
  • 17. async.boundedQueue[A] • Items added to queue are removed in same order • Connect different asynchronous domains • Methods:
 
 def  enqueueOne(a:  A):  Task[Unit]
 
 def  dequeue:  Process[Task,  A]
  • 18. HTTP Client Implementation • Use Apache AsyncHTTPClient • Hook into onBodyPartReceived callback • Use async.boundedQueue to convert chunks into stream
  • 19. def  httpRequest(client:  AsyncHttpClient,  url:  String):        Process[Task,  ByteVector]  =  {      val  contentQueue  =  async.boundedQueue[ByteVector](10)      val  req  =  client.prepareGet(url)      req.execute(new  AsyncCompletionHandler[Unit]  {
        override  def  onBodyPartReceived(content:  HttpResponseBodyPart)  =  {
            contentQueue.enqueueOne(                  ByteVector(content.getBodyByteBuffer)              ).run              super.onBodyPartReceived(content)
        }
    })
    contentQueue.dequeue   }
  • 21. req.execute(new  AsyncCompletionHandler[Unit]  {      ...      override  def  onCompleted(r:  Response):  Unit  =  {
        logger.debug("Request  completed")
        contentQueue.close.run
    }      ...   }
  • 22. How to terminate stream with errors?
  • 23. req.execute(new  AsyncCompletionHandler[Unit]  {      ...      override  def  onThrowable(t:  Throwable):  Unit  =  {
        logger.debug("Request  failed  with  error",  t)
        contentQueue.fail(t).run
    }      ...   }
  • 25. • Split at line endings • Convert ByteVector into UTF-8 Strings • Partition by SSE “tag” (“data”, “id”, “event”, …) • Emit accumulated SSE data when blank line found
  • 26. • Split at line endings
 
 ByteVector  =>  Seq[ByteVector] • Convert ByteVector into UTF-8 Strings
 
 ByteVector  =>  String • Partition by SSE “tag” (“data”, “id”, “event”, …)
 
 String  =>  SSEMessage • Emit accumulated SSE data when blank line found
 
 SSEMessage  =>  SSEEvent
  • 27. Handling Network Errors • If a network error occurs: • Sleep a while • Set up the connection again and keep going • Append the same Process definition again!
  • 28. def  sseStream:  Process[Task,  SSEEvent]  =  {      httpRequest(client,  url)          .pipe(splitLines)          .pipe(emitMessages)          .pipe(emitEvents)          .partialAttempt  {              case  e:  ConnectException  =>  retryRequest              case  e:  TimeoutException  =>  retryRequest          }          .map(_.merge)   }   def  retryRequest:  Process[Task,  SSEEvent]  =  {      time.sleep(retryTime)  ++  sseStream   }
  • 29. Usage sseStream(client,  url)  pipe  jsonToString  to  io.stdOutLines
  翻译: