CARL LERCHE: My name is Carl Lerche and
I'm gonna be talking about Apache Storm.
I am shooting for forty-minutes.
It's going to be a long talk.
I'm gonna try to cram everything in.
I already got a late start.
There probably won't be time for questions.
I'm also probably going to gloss over
a few things that, like, hand-wave-y things,
so if something comes up, you have a question,
just send me a Tweet directly
and I will respond to it after the talk.
Or just come and talk.
So I work for Tilda, and our product is
Skylight, and we're building a smart profiler
for your
Rails app. And I've actually built the entire
backend
using Storm, so another thing, if you want
to
talk about how that works, how we've, I've
ended
up using Storm and the, in the real world,
I guess. We, I'll be happy to talk about
it.
So what is Storm? Let's start with how it
describes itself on the web site. It calls
itself
a distributed real time computation system,
and that sounds
really, really fancy. The first time I thought
about
using it, I was like, oh, let me go
look and see what Storm's all about. I saw
this, was like, whoa. Step back. I'm not going
to. This, this sounds like, this is too serious
business for me. I need something simpler.
It took me like maybe like another six months
of really, OK, I'm gonna take time, I'm gonna
learn this, and as I got to know it,
it really came to me. Oh, this can be
used for many, many other use cases. I'm not
going to. I might, at the end, if there's
time, talk about some of them, but I'm going
to try to get into the nitty gritty sooner
than later.
So, but for now, I'm just gonna say, Storm
is a really powerful worker system. So some
things
about it. It is distributed. It's very, very,
very
distributed, which is a good and a bad thing.
One, the good thing is you can, like, spread
your load across many servers. The bad thing
is,
your operate, like your ops system is gonna
look
something like this. Which can turn out to
be
somewhat of an operational headache. So, like,
you'll end
up having to run a Zookeeper cluster. You'll
end
up having to run, like, the Storm Nimbus process
somewhere, and then there's gonna be a number
of
other Storm coordination processes. There's
gonna be, like, every
server's gonna have the worker process.
And then, like, while you're at it, you've
gone
this far, let's throw in a distributed database
and
some distributed queues in there too. But
the main
point is there's going to be an operational
overhead.
So don't just do it for fun. There needs
to be a good reason. If you can get
the job done on a single server, go for
that.
It's fault-tolerant, and what I mean by that
is,
it is able to recover and continue making
progress
in the event of a failure. And I know
a lot of existing systems claim to, claim
to
do this, but the reality is, pandering faults
is
really, really hard. So it doesn't make it,
like,
seamless. Storm doesn't make it seamless.
The reality is
it can't be seamless. But I think, and I'm
going to talk about it later, I think the
way it does it is about as easy as
it can get. I hope.
It's really fast. Very low overhead. Some
completely useless
benchmark numbers. It's been clocked at, like,
processing over
a million messages per second on a single
server.
Completely useless number but it sounds good.
Supposedly it's language agnostic. And by
that I mean,
supposedly you can use absolutely any language
at all
to build your data processing pipeline. There
are examples,
on the website, where they use bash to, I
don't know why, it's probably a novelty. If
you're
distributed bash sounds good. But I guess
that means
you can probably use MRI. However, I've personally
only
used it on the JVM. It's a JVM-based project.
It's written in Java and Clojure. It uses
many
Java libraries. I would just recommend, if
you're gonna
try it, try it out on the JVM.
And the good news is, we have JRuby, which
has done an extremely good job at integrating
with
Java. So you can use these Java libraries,
and
you're just writing Ruby. Some of the things
I
want to handwave also is exactly how to get,
like, all the details of getting it running
on
JRuby. But there is a GitHub project called
redstorm.
You could probably just Google GitHub Redstorm.
I've realized
I didn't actually include any links. But,
again, I
can find them later.
So it does a JRuby binding to storm. It
also goes a bit further. It provides, like,
Ruby
DSLs for everything related to storm. So you
can
define all your data transformations in terms
of Ruby
DSLs.
So the way I'm gonna kind of go over
storm is I'm going to build up a straw
man. That straw man is going to be, how
would one implement Twitter trending topics.
You know, it's,
I think I, we all hopefully know about what
a trending topic is. That's kind of why I
picked it. It's. But. Essentially, everybody
Tweets and they
can include these hash tags. Hash tag RailsConf.
Hash
tag whatever. And as the rates of single hash
tags increase, like the highest, the most,
the hash
tags that occur at the highest rates end up
getting bubbled up, and Twitter says, ah,
these are
the topics that are trending so that everybody
can
see what's going on.
So, like, around here, you might see, RailsConf
is
trending. Another good reason, a reason why
I picked
it is it's very time-sensitive, and times
into, ties
into the real time aspect of Storm.
All right. So to implement this, the way I'm
going to calculate the rate, like how often
a
Tweet is happening is by using an exponentially
weighted
moving average. Fancy. Basically what it means
is like,
as we're moving up, going, like, processing
the Tweets,
what I'm gonna do is I'm gonna count the
number of occurrences of each hash tag, for
like,
every five seconds. The interval doesn't mat,
really matter.
Then we're gonna average them up.
And, but instead of doing a normal average,
which
is sum up all the values divided by the
number of values, we're going to do a weighted
average, which is going to cause the older,
like
the older occurrences, to drop in weights
exponentially. This
is how Linux does their one minute, five minute,
and fifteen minute load averages.
It's a pretty nice way of smoothing out the
curves, and you can tune how you want the
curves to end up getting smoothed. So first
is
kind of build up some context. Let's kind
of
look at how one might implement this with
Sidekiq
or, like, Resque or whatever. Any other, like,
kind
of simpler system that just pops off of a
queue with a worker.
This is what it might look like. You've got
your Rails app. Pushes data into Redis and
then
each, like, the worker says, OK, I'm gonna
pop
a message off of Redis. I'm going to read
straight from the database. I'm gonna process
the message.
I'm gonna write straight back out from the
database.
And then the Rails app can just query the
result by reading it straight from the database.
Here's
a, it might run, I did not. I, is
that bright enough? Hopefully it is. The comment
basically
says, yes, this is very naive. I know it.
But that's not the point. Don't say, oh you
could do this one million times faster. That's
not
the point.
Hopefully it's readable.
So tags. So the basic premise is you have
an entry method, which takes in a Tweet as
an argument, and it, first step is to extract
the hash tags from the body. So I get,
like, an array of tags, and I loop over
each tag and I read from the database, is
there already an existing record? If so I'm
going
to update the moving average for it and then
save it back.
Oh. Yeah. Put highlights. So this is how one
might. It doesn't really matter. It was kind
of
like for informational purposes. But this
is how one
might implement the moving average like function.
So the,
the most important part that I want to call,
like, draw attention to is that in the update
EWMA, the very first thing we do is catch,
catch p. And I didn't pass now. But in
theory it should pass now down to the catch
up and it will keep ticking to update the
rate for every bucket that we decided we're
smoothing
by. So, like, every five seconds it's going
to
reupdate the int, the rate so that it will
downweight previous values and go on from
there.
So this is going to expose a problem with
our first version of the worker, in that if
we do not receive any Tweets with a given
hash tag, what, how do we handle that? We
need to still update the record in order to
constantly update the rates so that, ah, we're
not
resaving any data. We have to start re, lowering
the weight of that hashtag.
So, just, we'll have to end up implementing
something
like this. Which is going to be scheduled
via
Chrone or just regular, some regularly scheduled
task that
probably has to run every bucket interval.
And first
step is just delete all the existing tags
that
are way too old just to clear the database.
And then load up all the tags and update
the rates for them. Make sure they're caught
up.
So this is going to, one thing to point
out is, OK, so we now have to have
a task that is going to read literally every
record from the database every tick. And there
probably
is a better way to do this. But that's
not the point.
So is it web scale, yet? Well, let's look.
So far, not yet. But what we can do
is we can start scaling out our workers. We're
just gonna like, ah, let's spawn up three
workers.
Now we're ready to handle fifty-thousand Tweets
per second.
Well, maybe not, but we have more tricks up
our sleeve. Like, we know, like I mentioned,
we
know that the database is going to be a
pretty big bottleneck, cause every single
Tweet we get
in, it's going to have to read state, process
it, and write it back.
What if we could optimize the, and, remove
the
read state and, by caching a memory. Super
simple
implementation, which more is represented
as like, oh, unless
like, we basically build an in memory cache
of
hash tags in the process, such that every
time
we get a Tweet, it comes in and we
first check our cache before trying to, making
a
new one. And we update that and save it
back to the database.
So let's run through this and see what's happening.
Here's the first Tweet that comes, that comes
through.
We include, queue up, it comes in through
the
queue. Fancy animations, yeah. And it goes
to the
first worker. The worker processes it. It's
like, OK,
my cache is empty. I'm gonna check in the
database. Nothing for RailsConf in the database.
So I'm
just gonna process that memory the count one.
And
we're gonna write it to the database. K.
Next Tweet comes in. Ah, what's for lunch
at
RailsConf? Well we know now, but it's going
to
be, like, queued, like, again, queued up in
here,
and I'm sure you know where this is going.
Goes to the second worker. Second worker reads
the
counts and it works, increments count to two
and
it goes back and writes the database.
And here's the third Tweet. And we're gonna
get
to it. This one's going to go back to
the first worker, it's like, oh, I have everything
cached. I don't need to read from the database.
No probably. We'll count to two and yes. Yes.
This. I did this way too late. I was
like, ah, I'm just gonna do fancy animations.
All
right, so, caching in this kind of system
is
not obvious. We have this problem of, like,
we
have, we can still make it work. There's many
things we could do. Like, we could push, like,
we could push for cache external. We could
push
for mem cache, but now we still have an
external process that is required for coordinating
the work.
So, the main thing is, even though we have
many worker jobs, there is still a high amount
of coordination that's required to get this
to work.
K.
Enter Storm. And, as you can guess, probably
this
is gonna, like, all these problems are gonna
magically
go away when we use storm. I wish. But
it'll, there's something we can do. Let's
start with
some abstractions. The core abstractions in
Storm are the
streams and the tuples. A stream is, essentially,
a
series of tubes through which data flows.
And, but
they call them streams. I would, tubes would
have
been better.
And tuples are just, it's just like a list
of values. And these values can be anything.
They're
really just the messages. So you can put in
tuples, you can put your strings, you can
put
integers, you can put any object you want
as
long as you can serialize it.
And you, you're allowed to specify custom
serialization. So
as long as you can serialize it to JSON,
serialize it to any serialization format,
you can put
that object in the tuple.
The rest of Storm is going to be a
series, a bunch of primitives to take these
streams
and tuples and to transform the data from,
like,
one representation to the other.
Next, spouts and states. So the spouts are
the
source of the streams. So this is how you
get data into storm. This is the starting
point
of the streams. So anything that you want
to
read from the outside world is going to end
up being represented as a spout. And this
can
be reading from redis, reading from SQS, reading
from
anything. But it can also be, you could implement
a spout that reads directly from the Twitter
API.
You could implement a spout that reads from
the
database, that makes HTTP requests, that gets
the time
of the day, even. And, yes, you will want
to actually implement a spout that gets current
time
of the day. And I'll talk about that later.
States are the opposite. State is how you
get
the result of the transformations outside
of Storm. So
exactly the opposite. If you want to write
outside
of storm, like you write to the database,
write,
make HTTP post requests, send email, push
to external
queues, anything like that. It's going to
be implemented
as a state. So, so far this is what
we know about Storm.
We have Spout. It starts a stream. Data is
going to flow through it and it's going to
end up at the state. So far nothing is
super interesting.
Transform. The rest is really about how do
we
transform the data as it flows through. And
transforms
are gonna be purely functional operations
on the data.
So they are going to be given the, given,
given inputs, the same, it's, they're going
to output
the same values. Anyway.
So let's add a couple transforms here. So
we
have a spout. It's going to send some data
and we can, like, filter if we need to.
So we'll filter some data out and we'll aggregate
it. And the data will flow through and end
up at the state and then get persisted.
And then more transforms. Basically from here,
what you
end up doing is modeling your data in terms
of dataflow to get to the end point. And
there's really, you can, there's no limitation
in terms
of, I mean, granted you don't add a million
spouts because that will probably be crazy
and you'll
be, not be able to understand the code. But
besides that, you can add as many spouts as
you want. You can add as many state end
points. You can, like, split the streams.
You can
join them. And do whatever real transformation
that you
want. And Storm is going to take care of
figuring out how to take this definition,
like, the
state of the definition and distribute it
across your
cluster.
And this entire set, basically, wrap of starting
at
stream, starting at spouts and ending at states
is
called the topology. And in a bit, I'm gonna
show how to define the topology.
So, OK. Let's break it down just a little
bit. So the spout is where it starts, and
then it's going to emit tuples to the filter.
Generally, as you're gonna get the spout,
like, it's
gonna be a pretty standard thing. Like, for
example,
if you are pulling, if you are pulling data
from Redis and want, you will use a Redis
spout that already exists. And it will literally
just
be in your definition, just use Redis spouts.
And
filters, as well, any, any transforms can
usually be
abstracted up to higher level concepts and
shared.
So, for example, this is how you might implement
the my filter, the my filter function or transform
for Storm. So the base function is provided
as
part of the Storm API. And the requirement
is
that you define a single method that takes
the
input as a tuple and it will take the
output, which represents the stream. So you
will get
in a tuple, you're gonna process it, and you
will then emit zero or more output tuples
onto
the output stream.
So, the first step in the filter is just
get the, this is, get the message out of
the tuple, and this is somewhat of a Java-ish
API, but it's possible to shim. So, get value
by field. You, in this case, we're just creating
the tuple more as a hash map than anything
else. So we get the message. Is the message
awesome? If so, we just output it on the
output stream. And that is all that it takes
to implement a very simple transform.
K. Here's another example. And this is not
something
to run in production but is super helpful
for
debugging. All right, the next step is define
the
topology. Like, define how all the different,
like, transforms
and spouts and everything hook up together.
And the important part's on the screen. So
the
define_topology method doesn't matter, it's
whatever. The main points
you will start at, you will get a topology
object, and on that you'll use a series of
APIs define how things flow together. So you
start
by defining the stream and you just name it
as, are my-spout, and in this case, I'm saying
MyQueueSpout, but it would be like My, like
RedisSpout
dot new specify the server, the, like, wherever
the
server is and the topic you want to read
from. Or, if you're using SQS you would pass
in your credentials and the topic you're reading
from,
et cetera.
Next step, usually the spouts will just output
tuples
that are raw bytes. So it will just get
the raw bytes from Redis and output it, so
the next step is to deserialize the raw bytes,
so we are getting a message that, we are
saying ah, we are expecting in, as an input,
a tuple that contains the field bytes. We
are
gonna pass it to the queue message deserializer.
And
the output is gonna be a tuple that contains
message.
I included the input, an example implementation
under. And
then we're gonna chain that, and we're gonna
say,
like, for each tuple that we get, we're gonna
expect, well, we're gonna expect a tuple that
contains
the field message. We're gonna pass it to
my
filter, which will filter it based off of
our
predicates, and we'll output another tuple
that contains a
message. And finally we'll pass it the logger.
Next step, run it. And the easiest way to
get this running, just for exper, like, playing
around,
is just running it locally. And redstorm does
this,
but if you want to just, like, the actual
AP, like, amount of code to get it started
is ba- is just this. You initialize a new
cluster object. You define new TridentTopology.
I'm going to
just kind of like, wave my hands over what
Trident is. You can read the Wiki.
And doesn't really matter for the talk. You
can
initialize a config and then there's our define_topology
method.
You pass in topology and then you submit it
to the cluster, which is just your locally
running
machine. And the entire thing is running.
Winning.
But only a little bit winning, because we
have
not, we're not doing anything with the data
yet.
So the next step is going to be to
persist the results. And, again, there are
higher-level things
that, there are like, there are already existing
libraries
where it's like, oh, you take in these tuples
and just throw them, throw them entirely in
Redis
that take no code. But I am going to
jump directly down into how you would implement
a
state from scratch.
So what it takes is, you just make a
class, inherit from state. This is provided
from, as
part of the Storm API as well. The only
methods you have to define are begin commit
and
commit, and I'm gonna cover those later. Besides
that,
you just provide whatever you want. And as
an
API to interactive to state. Because next
component is
going to be the state updater.
The state updater takes in the, requires you
to
define the method update_state, which is going
to pass
in the instance of your state and is gonna
pass in the input tuple and an optional, like,
it also gives you an output string in case
you want your state to emit more tuples. But,
in our case, we're gonna, all we're gonna
do
is ex- get the message out of the tuple
and call persist awesomely, and that's it
for now.
Except, cause it's Java, it's also a factory.
But,
let's not mention that. So here we have, I
just wanted to add the state to the topology
that we're defining. And we're just adding,
all I
did was replace the logger one, and instead
of
logging, I'm calling partition persist, passing
in the factory.
I'm saying, I'm expecting. Now I'll just build
the
state. But I'm expecting a tuple that has
the
field message, and I'm passing use the basic
updater.
And that's it for getting a very basic topology
running. So the next step is let's go back
to our initial, like, Twitter hash tag example.
This
is somewhat what, like, this might be what
it
might look like. Ah, many mights. This is
what
it could look like as a Storm topology. You'd
start with a Tweet spout, however you decide
to
get your Tweets in. Via Redis, via directly
Twitter
API, whatever. You pass it, you'd pass it
to
a transform, which the entire, which its only
goal
is to get the Tweet body, get out the
hash tags and output them as tuples. The next
step is going to be an aggregate, and what
that's going to do is it's going to get
all the, it's going to get all the hash
tags and track how many counts there are.
And
then it's gonna send that to the state, and
the state is going to do the moving average
calculation.
This is what it might look like. Again, pretty
basic. Extract hash tags inherits from base
function. We
define execute and what it's gonna do is get
the Tweet body out of the Tuple, extract hash
tags. I believe it's the same code. And loop
over it and just emit new tuples.
Next, the aggregator. So the first thing it
does
is just init, so, the. Oh, yeah. First the,
an aggregate function is basically just like
Ruby's in,
inject on enumerable. So you pass it an initial
state and then it's going to loop over whatever
it is and pass, pass you the. So the
aggregate is the iteration. It's going to
pass in
the state that it's building up to as well
as each Tuple, and again, an optional output
stream
that you could output Tuples in mid-iteration.
And the init method, you return the initial
state.
Finally there's going to be an extra complete
method,
which is oh, we are done aggregating, so let
us then finally output our aggregation as
tuples to
the stream. And, in this case, just going
to
loop over our, our summary, which is a hash
map with a hash tag and a count. I'm
gonna output tuples that contain the hash
map, the
hash tag and the count.
And hook it all up. Pretty similar. The main
things with aggregates, aggregates functions
or aggregate transforms, you
want to call partition aggregate. And I will
cover
that in a bit. And also we're going to,
at the same time, add our trending topic state.
So this one might be implemented, let's again,
we
inherit from, OK, at the very all I do
is trending topic state inherits from state.
Exactly the
same as previously.
I'm going to implement begin commit and commit,
for
now. And I'm gonna leave them empty and come
back to it. And the update method is going
to take the hash tag and the count. And
it's going to do the exact same work that
it did in the Sidekiq one, for now. So
it's going to find an existing hash tag record
by name and update the moving average and
save
it again. And the updater is going to take
in all the tuples and just pass it to
the states.
So you might be wondering, well, I thought
streams
were unbounded sequences of tuples. So if
that is
the case, when does one call the complete
method?
Infinite time from now? I don't know. But
no.
What, the way Storm executes its topology
is it
executes in batches. So the way it works is
gonna be the Storm coordinator, is going to,
here
we go. Storm coordinator is going to tell
your
spout, yo spout. I'm gonna just read what
it
says. Yo spout, start batch id 123.
And the spout is going to then be like,
OK. Cool. We're starting batch 123. Let's
get some
data for it. I fetched two hundred tuples
from
whatever source. Like, I, I popped two hundred
messages
off of Redis. Seems good. And this is gonna
then go through everything we just saw, send
the
messages down. It's gonna go to the hash tag
function, which will then go to the aggregate
function,
which then go to states. The the states gonna
be, persist the result and send it back to
Storm.
So since an I can tell, we can tell
Storm, we completed batch 123. So what's,
so what
it means is, after, whenever there's a, a
part
of the API that appears to be time-based,
like
begin commit, commit, and in our case, the
complete
or the aggregate function, that's gonna be
called after
every single batch. So what, when we say aggregate
all, like, aggregate all the counts of the
hash
tags, we're really just doing it for a batch.
Oh yeah. And that it's going to start the
next batch. This just says start batch 124.
So is it web scale yet? Well, this is
obviously always the answer you have to ask
after
every single thing. Every single hour of work.
Well, this is what I've got so far. This
is what I told you was happening. We've got
a Tweet spout. It's sending data down a stream,
a single stream, into, to the extract hash
tags.
So we have one stream and, well, can a
single stream handle fifty thousand Tweets
per second? Well,
let's break it down for a bit.
This is the most basic topology. This is what
I said happened. This is what conceptually
it happens.
But, the way it executes is, a stream actually
has many partitions. So the number of partitions
is
configurable. But every single partition can,
is completely independent.
So it can run on completely different nodes
in
your Storm cluster. So if you have eight different
partitions, you're going, you have the opportunity
of read
both from the spout, read from eight different
servers
sending it, like, transforming and persisting
on eight different
servers. You could have up to, you can have
like a thousand different partitions if that's
how far
you need to go.
But the main point is that you can split
up the work load even though you have one
conceptual stream, you can split it up across
many
partitions and thus servers and threads. So
let's kind
of try to work through, again, the hash tag
thing through how it might work with partitions.
We
had the spout. That says RailsConf. And it
comes
out of the spout in one partition, and because
we haven't said anything yet, it's just gonna
say,
oh, we're just gonna send it to the aggregate
transform in the same partition. And it's
going to
go to the state as well on the same
partition.
Here's another bottom left, like, on the left
side,
another RailsConf tag. It's going to, it's
in a
completely different partition. So possibly
a different server. It's
going to go the aggregate in a completely
different
server, and ideally what we'd want is for,
at
the very end, we can do the moving average
calculation on the same server. So ideally
it would
end up on the same state partition.
But if we have another, like this is just
hash tag sleep, something I would like to
do,
it would stay on its own partition and it
would not, like, it doesn't need to be on
the, run on the same server that processes
the
RailsConf hash tag.
So in, but the one thing's like that, one
move from the aggregate middle partition up
to a
combining into one end point state partition,
we haven't
defined yet.
The way you do that is, at any point,
you can be, oh, at this point, I want
to partition my tuples by hash tag. And when
we say that, what we're saying is we're specifying
a field on the tuple, and we're saying, hey
storm, we would like, once we cross this point,
we would like any tuple that has the same
hash tag value to end up in the same
partition. So this, at this point, like, up
to
now, storm didn't, could actually run the
entire topology
and same partition on one server.
There was no need to, like, it might, I
mean you could specify, make it do that, but
there is actually no need to process, to have
a Tweet hit the network, basically. To go
to
a different node. The moment you add a partition,
like statement, you're saying, OK, yes, at
this point,
we're going to have to make sure that all
hash tags on the same value end up on
the same server. So it's going to, at this
point, try and, like, make sure to do that,
and thus may hit the network for it.
So how many partitions should we run? Well,
this
is very, very use-case specific. You could
run eight.
You could run five-hundred twelve. You, it,
it actually
depends on your use case. But in order to
answer this better, we need to talk a bit
about how Storm schedules partitions on the
cluster. So
first of all, there are many servers. And
you
have a cluster, you have many servers distributed,
yay.
So let's say, I don't know, three servers,
and
each server could have many threads per server,
because
it's very, very concurrent.
So you might have, I don't know, thirty, forty,
you, configurable amount of servers, threads
per the server.
And there can be many partitions per thread,
as
well. So every, a thread in Storm is considered
an executer. And an executer can have an arbitrary
number of partitions. So when you start up
your
cluster, like let's say you've got, you want
to
boot up a Storm cluster that is three nodes,
you have nine partitions, three threads. So
it works
out pretty nice. This is how it might, like,
start initially.
So you boot up and you have, like, anyway.
So it's going to assign three partitions per
thread,
per server. So that, the nice thing of dealing
with partitions, dealing with partitions is
actually a, something
that basically everything that ends up being
pretty distributed
does. Like, Reaq, Cassandra, Kafka, all these
things go
via partitions, and, because it's very nice.
But little
tangent.
So, what happens is, when you lose a server,
oh. We don't have these executers anymore.
We have
all these partitions, they need to be handled
still.
Storm is going to redistribute them across
existing servers.
So it's gonna be like, OK, all these, all
these partitions which represent work, work
load, are gonna
be moved to the other servers.
And, inversely, that's a little, on the top
is
a little server box, and I'm saying, ah, we're
adding a server to our cluster. And then we're
gonna rebalance it, and Storm is going to
take
the existing partitions and redistribute,
redistribute it across the
available servers.
Probably more evenly than my little graph
says, talks
ab- shows. OK.
It's
time for some real talk. Failure.
So, the question is not will there be failure.
The question is, how do we handle it? Cause,
if you're building a distributed system, at
some point
there is going to be fail, there is going
to be something that fails. And you can either
close your eyes and pretend it doesn't happen
or
you can think through the problems, like how
will
we recover? Is the system going to be in
an inconsistent state afterwards? Is, are
we gonna lose
availability during this time?
Handling failure properly is probably the
hardest part of
building any distributed system. But it's
not like you
have to use really fancy technologies to build
a
distributed system. Like if you build a plain-old
Rails
app to build a distributed system, you have
a,
the browser talks to the Rails app. And all
these problems that exist, maybe we don't
think about
it as much because we're OK with higher error
rates and inconsistencies. But, let's think,
like, even a
simple case like a signup form, like the user
fills up the signup form and hits submit and
it errors out, what happened, right? Did the,
did
it fail reaching our server? Did it reach
our
server and we started processing it and, but
we
had an exception? Did we, like, actually create
the
account in the database and the next time
the
user is going to refresh the page it'll add,
like, I'd like, I'd like to really signup.
I'm
gonna try to signup again.
And it's gonna be, oh no, you can't signup.
Your email is taken. Well, OK. How do we
handle all this? And there are ways. I'm just
kind of bringing up this simple case, which
I'm
sure everybody here has properly protected
their Rails app
against, because nobody in this room has ever
had
this bug happen.
But, I bring it up because this is a
relatively simple case, and it really just
gets more
complicated from here. So, for example, in
Storm, the
whole, like, it's fine. The whole point is
that,
you may have one input tuple. Like, one message
might be popped up from the queue, and as
you process it, you're going to end up generating
more tuples. Like, for example, we are going
to,
from one Tweet, we are going to extract many
hash tag messages. And if you, as you build
more and more complex topologies, this is
just going
to like, this can, like, fan out.
So what happens if, like, this one dies? So
everything else, like, succeeded, but that
one, and all
of its children, didn't. How do we, then,
what
do we do? Do we try to retry just
that one tuple? If we do that, we then
have to, well, we have to make, track, that
is that specific one that failed, that's gonna
involve
a lot of bookkeeping. The, and what happens
if
tuples have multiple parents? And one parent
ended up
succeeding but not the other?
And that's possible, with Storm. You can have
tuples
that have multiple parents because you can
have joins.
But how does one process it? Do you retry
the entire batch? It's hard.
I will get to how you handle it. But,
and the details of how Storm does it internally
are really interesting, and I would recommend
you go
and read the Wiki page on it, but I
don't have time to talk about it. So, going
back to the original Sidekiq thing, what happens
if
we're, like, halfway through the iteration
and we've persisted
some of them and, we, it just explodes halfway
through? Well, do we retry the message? Do
we,
I don't know. What do? What now?
Generally, we have to think about, well, when
we're
using a system like Sidekiq or Storm, what
are
the message processing guarantees? Because
it's going to change
how you work with it. Generally, the two,
the
two main ones are, are messages processed
at least
once, or are messages processed at most once?
And
you can only get one, not the other. Like
you can, I mean, yes, you can get exactly
one, see what I mean. If anyone says, ah,
we guarantee that a message is going to be
processed exactly once, they're lying.
It's impossible.
So to que, what we can try to do
is, well, Storm is at least once. But what
we can try to do is handle the case,
is like, can we handle, if we get messages
that show up multiple times, can we handle
that?
Can we maybe not redo the processing? Can
we,
like, can we try to ensure that we end
up with what, as close to as possible, as
close as possible to processing exactly once?
And the way Storm tries to approach this is
really pretty nice. Like I said, we have batches.
Everything's processed in batches. And what
Storm will guarantee
is that a batch, like the next batch will
not complete, like, will not go, reach completion
until
the previous one is fully committed. And it
will
also give monotonically increasing batch ids.
So it will guarantee that the next batch id
will have a bigger id than the previous one.
So we can combine this, and I will just
Tweet my slides and it will be easier to
read, but the main this is, I'm now introducing,
in the beginning, commit and running out of
time
so I'm gonna try to get through this.
But the main thing, if we, in the beginning
commit, we get the transaction id, what we
can
do is store it off, and what, and load
up the state and we will talk about that
later. But the important thing I want to reach
is, talk about it is here, in the update
function, we don't actually write to the database
anymore.
We just store that in memory. And the main
line is return if hash tag last transaction
id
equals transaction id. What that means is
we are
able to then see, OK, since we know the
guarantees, if for some reason a message failed
and
we reprocess the batch, we, and we know, we
can know that this hash tag actually passed
a
certain point, like passed that point, so
we don't
need to recompute the moving average on it.
And get as close as possible to exactly once.
So then, we store all this memory, and once
the commit happens, which happens at the end
of
the batch, we write all of this, we do
basically, we do all the work that hits the
outside world, which is right to the database.
We
do all that there. And if we reach completion,
the batch will complete successfully, and
we know, OK,
all the messages have been successfully processed.
But if
it fails during the persist, half of the them
will have the new transaction id and half
of
them will have the old transaction id, but
when
the batch gets replayed and we reload that,
we
will see, ah, this message already finished
this batch.
So to roll through this real quick, I know
it's, I believe I'm getting really close or
past
my time. But, so, yeah, let's get data. What
happens? Like, I fetch two hundred tuples.
Boom. We
go through. Aggregate. And it explodes.
Eventually, the Storm coordinator's gonna
be like, up above,
it says, guys, I did not get a completion
message. All right, fine. Spout, let, we're
going to
restart batch 123, because I did not get completion.
So the Tweet spout's gonna be like, OK, I
got this. I'm gonna re-emit exactly the same
two
hundred tuples in the same order that I did
before.
So what that means is if we successfully extract,
like, if we successfully implemented our transforms
to be
purely functional, as in completely deterministic
based off the
input, as we go through, we're gonna get exactly
the same outputs. Such that, once we get to
the state, I'd, in theory everything's going
to be
exactly the same, such that when we actually
run
the persist, we know that if something we
successfully
persisted, and we get the same transaction
id, we're
gonna be like, well, it's going to be exactly
the same output.
All right. So the main catch is, of course,
transforms must be purely functional, cause
otherwise this won't
work. That means not ever using time dot now
within your transforms. I actually changed
it on the
last slide, but I had to, like, zoom through
it. The main thing is, again, one, if you
have need to get the current time for a
batch, one way to do it is to have
a spout that only emits the current time.
And
every, if the batch gets re-emitted, it just
knows,
it saves off somewhere. For this batch id,
I
outputted this time.
And the next thing is that, it's going to
require the spouts to re-emit exactly identical
batches. Which,
unfortunately, I think most spouts do not.
I mean,
most, like Redis, does not support. So in
order
to get this level of guarantees, the only
queue
is Kafka. And I was, would like to talk
more about Kafka and why it's so awesome,
but
we can talk about that after. In general,
even
if you're not using Storm, Kafka is a really
amazing tool that I highly recommend.
So, TL;DR. Storm is a really powerful platform
for
writing workers. It's really great for stateful
jobs, and
by stateful jobs, I mean jobs that depend
on
the result of previous ones. It's good for
complex
data processing flows. And that's it. I think.
I
don't. My time wasn't up here, so I'm guessing
this is about right.
Am I really early or late? OK. Cool.
I didn't have a clock on either so I'm
like, guessing. It didn't start, I don't know.
All
right. Well, I'm done. Thanks. All right.