-
Notifications
You must be signed in to change notification settings - Fork 55
/
streaming.Rmd
530 lines (406 loc) · 27 KB
/
streaming.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
```{r include=FALSE}
knitr::opts_chunk$set(eval = FALSE)
source("r/render.R")
source("r/plots.R")
library(ggplot2)
```
# Streaming {#streaming}
> Our stories aren’t over yet.
>
> --- Arya Stark
Looking back at the previous chapters, we've covered a good deal, but not everything. We’ve analyzed tabular datasets, performed unsupervised learning over raw text, analyzed graphs and geographic datasets, and even transformed data with custom R code! So now what?
Though we weren't explicit about this, we’ve assumed until this point that your data is static, and didn't change over time. But suppose for a moment your job is to analyze traffic patterns to give recommendations to the department of transportation. A reasonable approach would be to analyze historical data and then design predictive models that compute forecasts overnight. Overnight? That’s very useful, but traffic patterns change by the hour and even by the minute. You could try to preprocess and predict faster and faster, but eventually this model breaks—you can’t load large-scale datasets, transform them, score them, unload them, and repeat this process by the second.
Instead, we need to introduce a different kind of dataset—one that is not static but rather dynamic, one that is like a table but is growing constantly. We will refer to such datasets as _streams_.
## Overview
We<!--((("streaming", "overview of")))--> know how to work with large-scale static datasets, but how can we reason about large-scale real-time datasets? Datasets with an infinite amount of entries are known as __streams__.
For static datasets, if we were to do real-time scoring using a pretrained topic model, the entries would be lines of text; for real-time datasets, we would perform the same scoring over an infinite number of lines of text. Now, in practice, you will never process an infinite number of records. You will eventually stop the stream—or this universe might end, whichever comes first. Regardless, thinking of the datasets as infinite makes it much easier to reason about them.
Streams are most relevant when processing real-time data—for example, when analyzing a Twitter feed or stock prices. Both examples have well-defined columns, like "tweet" or "price," but there are always new rows of data to be analyzed.
_Spark Streaming_ provides<!--((("Spark Streaming")))--> scalable and fault-tolerant data processing over streams of data. That means you can use many machines to process multiple streaming sources, perform joins with other streams or static sources, and recover from failures with at-least-once guarantees (each message is certain to be delivered, but may do so multiple times).
In Spark, you<!--((("source")))((("sink")))--> create streams by defining a _source_, a _transformation_, and a _sink_; you can think of these steps as reading, transforming, and writing a stream, as shown in Figure \@ref(fig:streaming-working) describes.
```{r streaming-working, echo=FALSE, fig.cap='Working with Spark Streaming', fig.align = 'center', eval = TRUE, fig.align='center', out.height = '280pt', out.width = 'auto'}
render_nomnoml("
#direction: right
[Real-Time|[<note>File Source
Kafka Source
]]->[Transform|[<note>dplyr
SQL
Feature Transformers
Pipelines
Distributed R
]]
[Static|[<note>File Systems
Storage Systems]
]->[Transform]
[Transform]->[Sink|[<note>File Sink
Kafka Sink]]
", "images/streaming-working.png")
```
Let's take a look at each of these a little more closely:
Reading
: Streams read data using any of the `stream_read_*()` functions; the read operation defines the _source_ of the stream. You can define one or multiple sources from which to read.
Transforming
: A stream can perform one or multiple transformations using `dplyr`, `SQL`, feature transformers, scoring pipelines, or distributed R code. Transformations can not only be applied to one or more streams, but can also use a combination of streams and static data sources; for instance, those loaded into Spark with `spark_read_()` functions—this means that you can combine static data and real-time data sources with ease.
Writing
: The write operations are performed with the family of `stream_write_*()` functions, while the read operation defined the sink of the stream. You can specify a single sink or multiple ones to write data to.
You can read and write to streams in several different file formats: CSV, JSON, Parquet, Optimized Row Columnar (ORC), and text (see Table \@ref(tab:streaming-functions-table)). You<!--((("Kafka", "reading and writing from")))--> also can read and write from and to Kafka, which we will introduce later on.
```{r streaming-functions-table, eval=TRUE, echo=FALSE}
knitr::kable(
data.frame(
Format = c("CSV", "JSON", "Kafka", "ORC", "Parquet", "Text", "Memory"),
Read = c("stream_read_csv", "stream_read_json", "stream_read_kafka",
"stream_read_orc", "stream_read_parquet", "stream_read_text",
""),
Write = c("stream_write_csv", "stream_write_json", "stream_write_kafka",
"stream_write_orc", "stream_write_parquet", "stream_write_text",
"stream_write_memory")
),
booktabs = TRUE,
caption = "Spark functions to read and write streams"
)
```
Since the transformation step is optional, the simplest stream we can define is one that continuously copies text files between source and destination.
First, install the `future` package using `install.packages("future")` and connect to Spark.
```{r streaming-overview-prepare, echo=FALSE}
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
```
Since a stream requires the source to exist, create a `source` folder:
```{r}
dir.create("source")
```
We are now ready to define our first stream!
```{r streaming-overview-copy}
stream <- stream_read_text(sc, "source/") %>%
stream_write_text("destination/")
```
The<!--((("commands", "stream_write_*()")))--> streams starts running with `stream_write_*()`; once executed, the stream will monitor the _`source`_ path and process data into the _++destination /++_ path as it arrives.
We can use `stream_generate_test()` to produce a file every second containing lines of text that follow a given distribution; you can read more about this in [Appendix](#appendix). In practice, you would connect to existing sources without having to generate data artificially. We can then use `view_stream()` to track the rows per second (rps) being processed in the source, and in the destination, and their latest values over time:
```{r streaming-overview-test}
future::future(stream_generate_test(interval = 0.5))
```
```{r streaming-overview-view, eval=FALSE}
stream_view(stream)
```
The result is shown in Figure \@ref(fig:streaming-view-stream).
```{r streaming-view-stream, eval=TRUE, fig.align='center', echo=FALSE, fig.cap='Monitoring a stream generating rows following a binomial distribution'}
render_image("images/streaming-stream-view.png")
```
Notice that the rps rate in the destination stream is higher than that in the source stream. This is expected and desirable since Spark measures incoming rates from the source stream, but also actual row-processing times in the destination stream. For example, if 10 rows per second are written to the _source/_ path, the incoming rate is 10 rps. However, if it takes Spark only 0.01 seconds to write all those 10 rows, the output rate is 100 rps.
Use `stream_stop()` to properly stop processing data from this stream:
```{r streaming-overview-stop, eval=FALSE}
stream_stop(stream)
```
This exercise introduced how we can easily start a Spark stream that reads and writes data based on a simulated stream. Let’s do something more interesting than just copying data with proper transformations.
## Transformations
In<!--((("streaming", "transformations", id="Strans12")))((("transformations", "overview of")))--> a real-life scenario, the incoming data from a stream would not be written as is to the output. The Spark Streaming job would make transformations to the data, and then write the transformed data.
Streams<!--((("DataFrames", "transforming")))--> can be transformed using `dplyr`, SQL queries, ML pipelines, or R code. We can use as many transformations as needed in the same way that Spark DataFrames can be transformed with `sparklyr`.
The source of the transformation can be a stream or DataFrame, but the output is always a stream. If needed, you can always take a snapshot from the destination stream and then save the output as a DataFrame. That is what `sparklyr` will do for you if a destination stream is not specified.
Each of the following subsections covers an option provided by `sparklyr` to perform transformations on a stream.
### Analysis
You<!--((("transformations", "analysis")))--> can analyze streams with `dplyr` verbs and SQL using `DBI`. As a quick example, we will filter rows and add columns over a stream. We won’t explicitly call `stream_generate_test()`, but you can call it on your own through the `later` package if you feel the urge to verify that data is being processed continuously:
```{r streaming-analysis}
library(dplyr)
stream_read_csv(sc, "source") %>%
filter(x > 700) %>%
mutate(y = round(x / 100))
```
```
# Source: spark<?> [inf x 2]
x y
<int> <dbl>
1 701 7
2 702 7
3 703 7
4 704 7
5 705 7
6 706 7
7 707 7
8 708 7
9 709 7
10 710 7
# … with more rows
```
It's also possible to perform aggregations over the entire history of the stream. The history could be filtered or not:
```{r streaming-analysis-aggregations}
stream_read_csv(sc, "source") %>%
filter(x > 700) %>%
mutate(y = round(x / 100)) %>%
count(y)
```
```
# Source: spark<?> [inf x 2]
y n
<dbl> <dbl>
1 8 25902
2 9 25902
3 10 13210
4 7 12692
```
Grouped aggregations of the latest data in the stream require a timestamp. The timestamp will note when the reading function (in this case `stream_read_csv()`) first "saw" that specific record. In Spark Streaming terminology, the timestamp<!--((("watermarks")))--> is called a _watermark_. The `spark_watermark()` function adds the timestamp. In this example, the watermark will be the same for all records, since the five files were read by the stream after they were created. Note that only<!--((("Kafka", "reading and writing from")))--> Kafka and memory _outputs_ support watermarks:
```{r streaming-analysis-watermark}
stream_read_csv(sc, "source") %>%
stream_watermark()
```
```
# Source: spark<?> [inf x 2]
x timestamp
<int> <dttm>
1 276 2019-06-30 07:14:21
2 277 2019-06-30 07:14:21
3 278 2019-06-30 07:14:21
4 279 2019-06-30 07:14:21
5 280 2019-06-30 07:14:21
6 281 2019-06-30 07:14:21
7 282 2019-06-30 07:14:21
8 283 2019-06-30 07:14:21
9 284 2019-06-30 07:14:21
10 285 2019-06-30 07:14:21
# … with more rows
```
After the watermark is created, you can use it in the `group_by()` verb. You can then pipe it into a `summarise()` function to get some stats of the stream:
```{r streaming-analysis-watermark-agg}
stream_read_csv(sc, "source") %>%
stream_watermark() %>%
group_by(timestamp) %>%
summarise(
max_x = max(x, na.rm = TRUE),
min_x = min(x, na.rm = TRUE),
count = n()
)
```
```
# Source: spark<?> [inf x 4]
timestamp max_x min_x count
<dttm> <int> <int> <dbl>
1 2019-06-30 07:14:55 1000 1 259332
```
### Modeling
Spark<!--((("transformations", "modeling")))((("online learning")))((("modeling", "Spark streams")))--> streams currently don't support training on real-time datasets. Aside from the technical challenges, even if it were possible, it would be quite difficult to train models since the model itself would need to adapt over time. Known as _online learning_, this is perhaps something that Spark will support in the future.
That said, there are other modeling concepts we can use with streams, like feature transformers and scoring. Let's try out a feature transformer with streams, and leave scoring for the next section, since we will need to train a model.
The<!--((("commands", "ft_bucketizer()")))--> next example uses the `ft_bucketizer()` feature transformer to modify the stream followed by regular `dplyr` functions, which you can use just as you would with static datasets:
```{r streaming-modeling-featureese}
stream_read_csv(sc, "source") %>%
mutate(x = as.numeric(x)) %>%
ft_bucketizer("x", "buckets", splits = 0:10 * 100) %>%
count(buckets) %>%
arrange(buckets)
```
```
# Source: spark<?> [inf x 2]
# Ordered by: buckets
buckets n
<dbl> <dbl>
1 0 25747
2 1 26008
3 2 25992
4 3 25908
5 4 25905
6 5 25903
7 6 25904
8 7 25901
9 8 25902
10 9 26162
```
### Pipelines
Spark<!--((("transformations", "pipelines")))((("pipelines", "Spark pipelines")))--> pipelines can be used for scoring streams, but not to train over streaming data. The former is fully supported, while the latter is a feature under active development by the Spark community.
To score a stream, it's necessary to first create our model. So let's build, fit, and save a simple pipeline:
```{r streaming-pipelines}
cars <- copy_to(sc, mtcars)
model <- ml_pipeline(sc) %>%
ft_binarizer("mpg", "over_30", 30) %>%
ft_r_formula(over_30 ~ wt) %>%
ml_logistic_regression() %>%
ml_fit(cars)
```
**Tip:** If you choose to, you can make use of other concepts presented in [Chapter 5](#pipelines), like saving and reloading pipelines through `ml_save()` and `ml_load()` before scoring streams.
We can then generate a stream based on `mtcars` using `stream_generate_test()`, and score the model using `ml_transform()`:
```{r streaming-pipelines-transform}
future::future(stream_generate_test(mtcars, "cars-stream", iterations = 5))
ml_transform(model, stream_read_csv(sc, "cars-stream"))
```
```
# Source: spark<?> [inf x 17]
mpg cyl disp hp drat wt qsec vs am gear carb over_30
<dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int> <dbl>
1 15.5 8 318 150 2.76 3.52 16.9 0 0 3 2 0
2 15.2 8 304 150 3.15 3.44 17.3 0 0 3 2 0
3 13.3 8 350 245 3.73 3.84 15.4 0 0 3 4 0
4 19.2 8 400 175 3.08 3.84 17.0 0 0 3 2 0
5 27.3 4 79 66 4.08 1.94 18.9 1 1 4 1 0
6 26 4 120. 91 4.43 2.14 16.7 0 1 5 2 0
7 30.4 4 95.1 113 3.77 1.51 16.9 1 1 5 2 1
8 15.8 8 351 264 4.22 3.17 14.5 0 1 5 4 0
9 19.7 6 145 175 3.62 2.77 15.5 0 1 5 6 0
10 15 8 301 335 3.54 3.57 14.6 0 1 5 8 0
# … with more rows, and 5 more variables: features <list>, label <dbl>,
# rawPrediction <list>, probability <list>, prediction <dbl>
```
Though this example was put together with a few lines of code, what we just accomplished is actually quite impressive. You copied data into Spark, performed feature engineering, trained a model, and scored the model over a real-time dataset, with just seven lines of code! Let’s try now to use custom transformations, in real time.
### Distributed R {streaming-r-code}
Arbitrary<!--((("transformations", "distributed R")))((("distributed R", "streaming")))--> R code can also be used to transform a stream with the use of `spark_apply()`. This approach follows the same principles discussed in [Chapter 11](#distributed), where `spark_apply()` runs R code over each executor in the cluster where data is available. This enables processing high-throughput streams and fulfills low-latency requirements:
```{r streaming-distributed}
stream_read_csv(sc, "cars-stream") %>%
select(mpg) %>%
spark_apply(~ round(.x), mpg = "integer") %>%
stream_write_csv("cars-round")
```
which, as you would expect, processes data from `cars-stream` into `cars-round` by running the custom `round()` R function. Let’s peek into the output sink:
```{r streaming-distributed-read}
spark_read_csv(sc, "cars-round")
```
```
# Source: spark<carsround> [?? x 1]
mpg
<dbl>
1 16
2 15
3 13
4 19
5 27
6 26
7 30
8 16
9 20
10 15
# … with more rows
```
Again, make sure you apply the concepts you already know about `spark_apply()` when using streams; for instance, you should consider using `arrow` to significantly improve performance. Before we move on, disconnect from Spark:
This was our last transformation for streams. We'll now learn how to use Spark Streaming with Kafka.<!--((("", startref="Strans12")))-->
```{r streaming-distributed-disconnect}
spark_disconnect(sc)
```
## Kafka
Apache Kafka<!--((("Kafka", "streaming")))((("Apache Kafka", "streaming")))((("streaming", "Kafka")))--> is an open source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation. It is written in Scala and Java. To describe it using an analogy, Kafka is to real-time storage what Hadoop is to static storage.
Kafka stores the stream as records, which consist of a key, a value, and a timestamp. It can handle multiple streams that contain different information, by categorizing them by topic. Kafka is commonly used to connect multiple real-time applications. A _producer_ is<!--((("producers")))((("consumers")))((("subscribers")))--> an application that streams data into Kafka, while a _consumer_ is the one that reads from Kafka; in Kafka terminology, a consumer application _subscribes_ to topics. Therefore, the most basic workflow we can accomplish with Kafka is one with a single producer and a single consumer; this is illustrated in Figure \@ref(fig:streaming-kafka-apis).
```{r streaming-kafka-apis, echo=FALSE, fig.cap='A basic Kafka workflow', fig.align = 'center', eval = TRUE, fig.align='center', out.height = '100pt', out.width = 'auto'}
render_nomnoml("
#leading: 4
#padding: 10
#arrowSize: 0.3
[<sender>Producer]-[<label>Stream]
[Stream]->[<transceiver>Kafka]
[Kafka] - [<label>Subscribe]
[Subscribe]->[<receiver>Consumer]
", "images/streaming-kafka-apis.png")
```
If you are new to Kafka, we don’t recommend you run the code from this section. However, if you're really motivated to follow along, you will first need to install Kafka as explained in [Appendix](#appendix) or deploy it in your cluster.
Using Kafka also requires you to have the Kafka package when connecting to Spark. Make sure this is specified in your connection `config`:
```{r streaming-kafka-connect}
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", config = list(
sparklyr.shell.packages = "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
))
```
Once connected, it’s straightforward to read data from a stream:
```{r streaming-kafka-read, eval=FALSE}
stream_read_kafka(
sc,
options = list(
kafka.bootstrap.server = "host1:9092, host2:9092",
subscribe = "<topic-name>"
)
)
```
However, notice that you need to properly configure the `options` list; `kafka.bootstrap.server` expects a list of Kafka hosts, while `topic` and `subscribe` define which topic should be used when writing or reading from Kafka, respectively.
Though we've started by presenting a simple single-producer and single-consumer use case, Kafka also allows much more complex interactions. We will next read from one topic, process its data, and then write the results to a different topic. Systems that are producers and consumers from the same topic are referred to as _stream processors_. In Figure \@ref(fig:streaming-kafka-two-outputs), the stream processor reads topic A and then writes results to topic B. This allows for a given consumer application to read results instead of "raw" feed data.
```{r streaming-kafka-two-outputs, echo=FALSE, fig.cap='A Kafka workflow using stream processors', fig.align = 'center', eval = TRUE, fig.align='center', out.height = '260pt', out.width = 'auto'}
render_nomnoml("
#padding: 20
#spacing: 80
#arrowSize: 0.3
[<sender>Producer]-> [Kafka]
[Kafka]->[Stream Processor]
[Stream Processor]->[Kafka]
[Kafka]->[<receiver>Consumer]
", "images/streaming-kafka-two-outputs.png")
```
Three modes are available when processing Kafka streams in Spark: _complete_, _update_, and _append_. The `complete` mode provides the totals for every group every time there is a new batch; `update` provides totals for only the groups that have updates in the latest batch; and `append` adds raw records to the target topic. The `append` mode is not meant for aggregates, but works well for passing a filtered subset to the target topic.
In our next example, the producer streams random letters into Kafka under a `letters` topic. Then, Spark will act as the stream processor, reading the `letters` topic and computing unique letters, which are then written back to Kafka under the `totals` topic. We'll use the `update` mode when writing back into Kafka; that is, only the totals that changed will be sent to Kafka. This change is determined after each batch from the `letters` topic:
```{r streaming-kafka-read-write, eval=FALSE}
hosts <- "localhost:9092"
read_options <- list(kafka.bootstrap.servers = hosts, subscribe = "letters")
write_options <- list(kafka.bootstrap.servers = hosts, topic = "totals")
stream_read_kafka(sc, options = read_options) %>%
mutate(value = as.character(value)) %>% # coerce into a character
count(value) %>% # group and count letters
mutate(value = paste0(value, "=", n)) %>% # kafka expects a value field
stream_write_kafka(mode = "update",
options = write_options)
```
You can take a quick look at totals by reading from Kafka:
```{r streaming-kafka-preview, eval=FALSE}
stream_read_kafka(sc, options = totals_options)
```
Using a new terminal session, use Kafka’s command-line tool to manually add single letters into the `letters` topic:
```
kafka-console-producer.sh --broker-list localhost:9092 --topic letters
>A
>B
>C
```
The letters that you input are pushed to Kafka, read by Spark, aggregated within Spark, and pushed back into Kafka, Then, finally, they are consumed by Spark again to give you a glimpse into the `totals` topic. This was quite a setup, but also a realistic configuration commonly found in real-time processing projects.
Next, we will use the Shiny framework to visualize streams, in real time!
## Shiny
Shiny’s reactive<!--((("Shiny Server")))((("streaming", "Shiny")))--> framework is well suited to support streaming information, which you can use to display real-time data from Spark using `reactiveSpark()`. There is far more to learn about Shiny than we could possibly present here. However, if you're already familiar with Shiny, this example should be quite easy to understand.
We have a modified version of the _k_-means Shiny example that, instead of getting the data from the static `iris` dataset, is generated with `stream_generate_test()`, consumed by Spark, retrieved to Shiny through `reactiveSpark()`, and then displayed as shown in Figure \@ref(fig:streaming-shiny-app).
To run this example, store the following Shiny app under `shiny/shiny-stream.R`:
```{r streaming-shiny, eval=FALSE, exercise=TRUE}
library(sparklyr)
library(shiny)
unlink("shiny-stream", recursive = TRUE)
dir.create("shiny-stream", showWarnings = FALSE)
sc <- spark_connect(
master = "local", version = "2.3",
config = list(sparklyr.sanitize.column.names = FALSE))
ui <- pageWithSidebar(
headerPanel('Iris k-means clustering from Spark stream'),
sidebarPanel(
selectInput('xcol', 'X Variable', names(iris)),
selectInput('ycol', 'Y Variable', names(iris),
selected=names(iris)[[2]]),
numericInput('clusters', 'Cluster count', 3,
min = 1, max = 9)
),
mainPanel(plotOutput('plot1'))
)
server <- function(input, output, session) {
iris <- stream_read_csv(sc, "shiny-stream",
columns = sapply(datasets::iris, class)) %>%
reactiveSpark()
selectedData <- reactive(iris()[, c(input$xcol, input$ycol)])
clusters <- reactive(kmeans(selectedData(), input$clusters))
output$plot1 <- renderPlot({
par(mar = c(5.1, 4.1, 0, 1))
plot(selectedData(), col = clusters()$cluster, pch = 20, cex = 3)
points(clusters()$centers, pch = 4, cex = 4, lwd = 4)
})
}
shinyApp(ui, server)
```
This Shiny application can then be launched with `runApp()`, like so:
```{r eval=FALSE, exercise=TRUE}
shiny::runApp("shiny/shiny-stream.R")
```
While the Shiny app is running, launch a new R session from the same directory and create a test stream with `stream_generate_test()`. This will generate a stream of continuous data that Spark can process and Shiny can visualize (as illustrated in Figure \@ref(fig:streaming-shiny-app)):
```{r eval=FALSE, exercise=TRUE}
sparklyr::stream_generate_test(datasets::iris, "shiny/shiny-stream",
rep(5, 10^3))
```
```{r streaming-shiny-app, eval = TRUE, fig.align = 'center', fig.cap = 'Progression of Spark reactive loading data into the Shiny app', echo = FALSE}
render_image("images/streaming-shiny-app.png")
```
In this section you learned how easy it is to create a Shiny app that can be used for several different purposes, such as monitoring and dashboarding.
In a more complex implementation, the source would more likely be a Kafka stream.
Before we transition, disconnect from Spark and clear the folders that we used:
```{r}
spark_disconnect(sc)
unlink(c("source", "destination", "cars-stream",
"car-round", "shiny/shiny-stream"), recursive = TRUE)
```
## Recap
From static datasets to real-time datasets, you’ve truly mastered many of the large-scale computing techniques. Specifically, in this chapter, you learned how static data can be generalized to real time if we think of it as an infinite table. We were then able to create a simple stream, without any data transformations, that copies data from point A to point B.
This humble start became quite useful when you learned about the several different transformations you can apply to streaming data—from data analysis transformations using the `dplyr` and `DBI` packages, to feature transformers introduced while modeling, to fully fledged pipelines capable of scoring in real time, to, last but not least, transforming datasets with custom R code. This was a lot to digest, for sure.
We then presented Apache Kafka as a reliable and scalable solution for real-time data. We showed you how a real-time system could be structured by introducing you to consumers, producers, and topics. These, when properly combined, create powerful abstractions to process real-time data.
Then we closed with "a cherry on top of the sundae": presenting how to use Spark Streaming in Shiny. Since a stream can be transformed into a reactive (which is the lingua franca of the world of reactivity), the ease of this approach was a nice surprise.
It's time now to move on to our very last (and quite short) chapter, [Chapter 13](#contributing); there we'll try to persuade you to use your newly acquired knowledge for the benefit of the Spark and R communities at large.