-
CARL LERCHE: My name is Carl Lerche and
-
I'm gonna be talking about Apache Storm.
-
I am shooting for forty-minutes.
-
It's going to be a long talk.
-
I'm gonna try to cram everything in.
-
I already got a late start.
-
There probably won't be time for questions.
-
I'm also probably going to gloss over
-
a few things that, like, hand-wave-y things,
-
so if something comes up, you have a question,
-
just send me a Tweet directly
-
and I will respond to it after the talk.
-
Or just come and talk.
-
So I work for Tilda, and our product is
-
Skylight, and we're building a smart profiler
for your
-
Rails app. And I've actually built the entire
backend
-
using Storm, so another thing, if you want
to
-
talk about how that works, how we've, I've
ended
-
up using Storm and the, in the real world,
-
I guess. We, I'll be happy to talk about
-
it.
-
So what is Storm? Let's start with how it
-
describes itself on the web site. It calls
itself
-
a distributed real time computation system,
and that sounds
-
really, really fancy. The first time I thought
about
-
using it, I was like, oh, let me go
-
look and see what Storm's all about. I saw
-
this, was like, whoa. Step back. I'm not going
-
to. This, this sounds like, this is too serious
-
business for me. I need something simpler.
-
It took me like maybe like another six months
-
of really, OK, I'm gonna take time, I'm gonna
-
learn this, and as I got to know it,
-
it really came to me. Oh, this can be
-
used for many, many other use cases. I'm not
-
going to. I might, at the end, if there's
-
time, talk about some of them, but I'm going
-
to try to get into the nitty gritty sooner
-
than later.
-
So, but for now, I'm just gonna say, Storm
-
is a really powerful worker system. So some
things
-
about it. It is distributed. It's very, very,
very
-
distributed, which is a good and a bad thing.
-
One, the good thing is you can, like, spread
-
your load across many servers. The bad thing
is,
-
your operate, like your ops system is gonna
look
-
something like this. Which can turn out to
be
-
somewhat of an operational headache. So, like,
you'll end
-
up having to run a Zookeeper cluster. You'll
end
-
up having to run, like, the Storm Nimbus process
-
somewhere, and then there's gonna be a number
of
-
other Storm coordination processes. There's
gonna be, like, every
-
server's gonna have the worker process.
-
And then, like, while you're at it, you've
gone
-
this far, let's throw in a distributed database
and
-
some distributed queues in there too. But
the main
-
point is there's going to be an operational
overhead.
-
So don't just do it for fun. There needs
-
to be a good reason. If you can get
-
the job done on a single server, go for
-
that.
-
It's fault-tolerant, and what I mean by that
is,
-
it is able to recover and continue making
progress
-
in the event of a failure. And I know
-
a lot of existing systems claim to, claim
to
-
do this, but the reality is, pandering faults
is
-
really, really hard. So it doesn't make it,
like,
-
seamless. Storm doesn't make it seamless.
The reality is
-
it can't be seamless. But I think, and I'm
-
going to talk about it later, I think the
-
way it does it is about as easy as
-
it can get. I hope.
-
It's really fast. Very low overhead. Some
completely useless
-
benchmark numbers. It's been clocked at, like,
processing over
-
a million messages per second on a single
server.
-
Completely useless number but it sounds good.
-
Supposedly it's language agnostic. And by
that I mean,
-
supposedly you can use absolutely any language
at all
-
to build your data processing pipeline. There
are examples,
-
on the website, where they use bash to, I
-
don't know why, it's probably a novelty. If
you're
-
distributed bash sounds good. But I guess
that means
-
you can probably use MRI. However, I've personally
only
-
used it on the JVM. It's a JVM-based project.
-
It's written in Java and Clojure. It uses
many
-
Java libraries. I would just recommend, if
you're gonna
-
try it, try it out on the JVM.
-
And the good news is, we have JRuby, which
-
has done an extremely good job at integrating
with
-
Java. So you can use these Java libraries,
and
-
you're just writing Ruby. Some of the things
I
-
want to handwave also is exactly how to get,
-
like, all the details of getting it running
on
-
JRuby. But there is a GitHub project called
redstorm.
-
You could probably just Google GitHub Redstorm.
I've realized
-
I didn't actually include any links. But,
again, I
-
can find them later.
-
So it does a JRuby binding to storm. It
-
also goes a bit further. It provides, like,
Ruby
-
DSLs for everything related to storm. So you
can
-
define all your data transformations in terms
of Ruby
-
DSLs.
-
So the way I'm gonna kind of go over
-
storm is I'm going to build up a straw
-
man. That straw man is going to be, how
-
would one implement Twitter trending topics.
You know, it's,
-
I think I, we all hopefully know about what
-
a trending topic is. That's kind of why I
-
picked it. It's. But. Essentially, everybody
Tweets and they
-
can include these hash tags. Hash tag RailsConf.
Hash
-
tag whatever. And as the rates of single hash
-
tags increase, like the highest, the most,
the hash
-
tags that occur at the highest rates end up
-
getting bubbled up, and Twitter says, ah,
these are
-
the topics that are trending so that everybody
can
-
see what's going on.
-
So, like, around here, you might see, RailsConf
is
-
trending. Another good reason, a reason why
I picked
-
it is it's very time-sensitive, and times
into, ties
-
into the real time aspect of Storm.
-
All right. So to implement this, the way I'm
-
going to calculate the rate, like how often
a
-
Tweet is happening is by using an exponentially
weighted
-
moving average. Fancy. Basically what it means
is like,
-
as we're moving up, going, like, processing
the Tweets,
-
what I'm gonna do is I'm gonna count the
-
number of occurrences of each hash tag, for
like,
-
every five seconds. The interval doesn't mat,
really matter.
-
Then we're gonna average them up.
-
And, but instead of doing a normal average,
which
-
is sum up all the values divided by the
-
number of values, we're going to do a weighted
-
average, which is going to cause the older,
like
-
the older occurrences, to drop in weights
exponentially. This
-
is how Linux does their one minute, five minute,
-
and fifteen minute load averages.
-
It's a pretty nice way of smoothing out the
-
curves, and you can tune how you want the
-
curves to end up getting smoothed. So first
is
-
kind of build up some context. Let's kind
of
-
look at how one might implement this with
Sidekiq
-
or, like, Resque or whatever. Any other, like,
kind
-
of simpler system that just pops off of a
-
queue with a worker.
-
This is what it might look like. You've got
-
your Rails app. Pushes data into Redis and
then
-
each, like, the worker says, OK, I'm gonna
pop
-
a message off of Redis. I'm going to read
-
straight from the database. I'm gonna process
the message.
-
I'm gonna write straight back out from the
database.
-
And then the Rails app can just query the
-
result by reading it straight from the database.
Here's
-
a, it might run, I did not. I, is
-
that bright enough? Hopefully it is. The comment
basically
-
says, yes, this is very naive. I know it.
-
But that's not the point. Don't say, oh you
-
could do this one million times faster. That's
not
-
the point.
-
Hopefully it's readable.
-
So tags. So the basic premise is you have
-
an entry method, which takes in a Tweet as
-
an argument, and it, first step is to extract
-
the hash tags from the body. So I get,
-
like, an array of tags, and I loop over
-
each tag and I read from the database, is
-
there already an existing record? If so I'm
going
-
to update the moving average for it and then
-
save it back.
-
Oh. Yeah. Put highlights. So this is how one
-
might. It doesn't really matter. It was kind
of
-
like for informational purposes. But this
is how one
-
might implement the moving average like function.
So the,
-
the most important part that I want to call,
-
like, draw attention to is that in the update
-
EWMA, the very first thing we do is catch,
-
catch p. And I didn't pass now. But in
-
theory it should pass now down to the catch
-
up and it will keep ticking to update the
-
rate for every bucket that we decided we're
smoothing
-
by. So, like, every five seconds it's going
to
-
reupdate the int, the rate so that it will
-
downweight previous values and go on from
there.
-
So this is going to expose a problem with
-
our first version of the worker, in that if
-
we do not receive any Tweets with a given
-
hash tag, what, how do we handle that? We
-
need to still update the record in order to
-
constantly update the rates so that, ah, we're
not
-
resaving any data. We have to start re, lowering
-
the weight of that hashtag.
-
So, just, we'll have to end up implementing
something
-
like this. Which is going to be scheduled
via
-
Chrone or just regular, some regularly scheduled
task that
-
probably has to run every bucket interval.
And first
-
step is just delete all the existing tags
that
-
are way too old just to clear the database.
-
And then load up all the tags and update
-
the rates for them. Make sure they're caught
up.
-
So this is going to, one thing to point
-
out is, OK, so we now have to have
-
a task that is going to read literally every
-
record from the database every tick. And there
probably
-
is a better way to do this. But that's
-
not the point.
-
So is it web scale, yet? Well, let's look.
-
So far, not yet. But what we can do
-
is we can start scaling out our workers. We're
-
just gonna like, ah, let's spawn up three
workers.
-
Now we're ready to handle fifty-thousand Tweets
per second.
-
Well, maybe not, but we have more tricks up
-
our sleeve. Like, we know, like I mentioned,
we
-
know that the database is going to be a
-
pretty big bottleneck, cause every single
Tweet we get
-
in, it's going to have to read state, process
-
it, and write it back.
-
What if we could optimize the, and, remove
the
-
read state and, by caching a memory. Super
simple
-
implementation, which more is represented
as like, oh, unless
-
like, we basically build an in memory cache
of
-
hash tags in the process, such that every
time
-
we get a Tweet, it comes in and we
-
first check our cache before trying to, making
a
-
new one. And we update that and save it
-
back to the database.
-
So let's run through this and see what's happening.
-
Here's the first Tweet that comes, that comes
through.
-
We include, queue up, it comes in through
the
-
queue. Fancy animations, yeah. And it goes
to the
-
first worker. The worker processes it. It's
like, OK,
-
my cache is empty. I'm gonna check in the
-
database. Nothing for RailsConf in the database.
So I'm
-
just gonna process that memory the count one.
And
-
we're gonna write it to the database. K.
-
Next Tweet comes in. Ah, what's for lunch
at
-
RailsConf? Well we know now, but it's going
to
-
be, like, queued, like, again, queued up in
here,
-
and I'm sure you know where this is going.
-
Goes to the second worker. Second worker reads
the
-
counts and it works, increments count to two
and
-
it goes back and writes the database.
-
And here's the third Tweet. And we're gonna
get
-
to it. This one's going to go back to
-
the first worker, it's like, oh, I have everything
-
cached. I don't need to read from the database.
-
No probably. We'll count to two and yes. Yes.
-
This. I did this way too late. I was
-
like, ah, I'm just gonna do fancy animations.
All
-
right, so, caching in this kind of system
is
-
not obvious. We have this problem of, like,
we
-
have, we can still make it work. There's many
-
things we could do. Like, we could push, like,
-
we could push for cache external. We could
push
-
for mem cache, but now we still have an
-
external process that is required for coordinating
the work.
-
So, the main thing is, even though we have
-
many worker jobs, there is still a high amount
-
of coordination that's required to get this
to work.
-
K.
-
Enter Storm. And, as you can guess, probably
this
-
is gonna, like, all these problems are gonna
magically
-
go away when we use storm. I wish. But
-
it'll, there's something we can do. Let's
start with
-
some abstractions. The core abstractions in
Storm are the
-
streams and the tuples. A stream is, essentially,
a
-
series of tubes through which data flows.
And, but
-
they call them streams. I would, tubes would
have
-
been better.
-
And tuples are just, it's just like a list
-
of values. And these values can be anything.
They're
-
really just the messages. So you can put in
-
tuples, you can put your strings, you can
put
-
integers, you can put any object you want
as
-
long as you can serialize it.
-
And you, you're allowed to specify custom
serialization. So
-
as long as you can serialize it to JSON,
-
serialize it to any serialization format,
you can put
-
that object in the tuple.
-
The rest of Storm is going to be a
-
series, a bunch of primitives to take these
streams
-
and tuples and to transform the data from,
like,
-
one representation to the other.
-
Next, spouts and states. So the spouts are
the
-
source of the streams. So this is how you
-
get data into storm. This is the starting
point
-
of the streams. So anything that you want
to
-
read from the outside world is going to end
-
up being represented as a spout. And this
can
-
be reading from redis, reading from SQS, reading
from
-
anything. But it can also be, you could implement
-
a spout that reads directly from the Twitter
API.
-
You could implement a spout that reads from
the
-
database, that makes HTTP requests, that gets
the time
-
of the day, even. And, yes, you will want
-
to actually implement a spout that gets current
time
-
of the day. And I'll talk about that later.
-
States are the opposite. State is how you
get
-
the result of the transformations outside
of Storm. So
-
exactly the opposite. If you want to write
outside
-
of storm, like you write to the database,
write,
-
make HTTP post requests, send email, push
to external
-
queues, anything like that. It's going to
be implemented
-
as a state. So, so far this is what
-
we know about Storm.
-
We have Spout. It starts a stream. Data is
-
going to flow through it and it's going to
-
end up at the state. So far nothing is
-
super interesting.
-
Transform. The rest is really about how do
we
-
transform the data as it flows through. And
transforms
-
are gonna be purely functional operations
on the data.
-
So they are going to be given the, given,
-
given inputs, the same, it's, they're going
to output
-
the same values. Anyway.
-
So let's add a couple transforms here. So
we
-
have a spout. It's going to send some data
-
and we can, like, filter if we need to.
-
So we'll filter some data out and we'll aggregate
-
it. And the data will flow through and end
-
up at the state and then get persisted.
-
And then more transforms. Basically from here,
what you
-
end up doing is modeling your data in terms
-
of dataflow to get to the end point. And
-
there's really, you can, there's no limitation
in terms
-
of, I mean, granted you don't add a million
-
spouts because that will probably be crazy
and you'll
-
be, not be able to understand the code. But
-
besides that, you can add as many spouts as
-
you want. You can add as many state end
-
points. You can, like, split the streams.
You can
-
join them. And do whatever real transformation
that you
-
want. And Storm is going to take care of
-
figuring out how to take this definition,
like, the
-
state of the definition and distribute it
across your
-
cluster.
-
And this entire set, basically, wrap of starting
at
-
stream, starting at spouts and ending at states
is
-
called the topology. And in a bit, I'm gonna
-
show how to define the topology.
-
So, OK. Let's break it down just a little
-
bit. So the spout is where it starts, and
-
then it's going to emit tuples to the filter.
-
Generally, as you're gonna get the spout,
like, it's
-
gonna be a pretty standard thing. Like, for
example,
-
if you are pulling, if you are pulling data
-
from Redis and want, you will use a Redis
-
spout that already exists. And it will literally
just
-
be in your definition, just use Redis spouts.
And
-
filters, as well, any, any transforms can
usually be
-
abstracted up to higher level concepts and
shared.
-
So, for example, this is how you might implement
-
the my filter, the my filter function or transform
-
for Storm. So the base function is provided
as
-
part of the Storm API. And the requirement
is
-
that you define a single method that takes
the
-
input as a tuple and it will take the
-
output, which represents the stream. So you
will get
-
in a tuple, you're gonna process it, and you
-
will then emit zero or more output tuples
onto
-
the output stream.
-
So, the first step in the filter is just
-
get the, this is, get the message out of
-
the tuple, and this is somewhat of a Java-ish
-
API, but it's possible to shim. So, get value
-
by field. You, in this case, we're just creating
-
the tuple more as a hash map than anything
-
else. So we get the message. Is the message
-
awesome? If so, we just output it on the
-
output stream. And that is all that it takes
-
to implement a very simple transform.
-
K. Here's another example. And this is not
something
-
to run in production but is super helpful
for
-
debugging. All right, the next step is define
the
-
topology. Like, define how all the different,
like, transforms
-
and spouts and everything hook up together.
-
And the important part's on the screen. So
the
-
define_topology method doesn't matter, it's
whatever. The main points
-
you will start at, you will get a topology
-
object, and on that you'll use a series of
-
APIs define how things flow together. So you
start
-
by defining the stream and you just name it
-
as, are my-spout, and in this case, I'm saying
-
MyQueueSpout, but it would be like My, like
RedisSpout
-
dot new specify the server, the, like, wherever
the
-
server is and the topic you want to read
-
from. Or, if you're using SQS you would pass
-
in your credentials and the topic you're reading
from,
-
et cetera.
-
Next step, usually the spouts will just output
tuples
-
that are raw bytes. So it will just get
-
the raw bytes from Redis and output it, so
-
the next step is to deserialize the raw bytes,
-
so we are getting a message that, we are
-
saying ah, we are expecting in, as an input,
-
a tuple that contains the field bytes. We
are
-
gonna pass it to the queue message deserializer.
And
-
the output is gonna be a tuple that contains
-
message.
-
I included the input, an example implementation
under. And
-
then we're gonna chain that, and we're gonna
say,
-
like, for each tuple that we get, we're gonna
-
expect, well, we're gonna expect a tuple that
contains
-
the field message. We're gonna pass it to
my
-
filter, which will filter it based off of
our
-
predicates, and we'll output another tuple
that contains a
-
message. And finally we'll pass it the logger.
-
Next step, run it. And the easiest way to
-
get this running, just for exper, like, playing
around,
-
is just running it locally. And redstorm does
this,
-
but if you want to just, like, the actual
-
AP, like, amount of code to get it started
-
is ba- is just this. You initialize a new
-
cluster object. You define new TridentTopology.
I'm going to
-
just kind of like, wave my hands over what
-
Trident is. You can read the Wiki.
-
And doesn't really matter for the talk. You
can
-
initialize a config and then there's our define_topology
method.
-
You pass in topology and then you submit it
-
to the cluster, which is just your locally
running
-
machine. And the entire thing is running.
Winning.
-
But only a little bit winning, because we
have
-
not, we're not doing anything with the data
yet.
-
So the next step is going to be to
-
persist the results. And, again, there are
higher-level things
-
that, there are like, there are already existing
libraries
-
where it's like, oh, you take in these tuples
-
and just throw them, throw them entirely in
Redis
-
that take no code. But I am going to
-
jump directly down into how you would implement
a
-
state from scratch.
-
So what it takes is, you just make a
-
class, inherit from state. This is provided
from, as
-
part of the Storm API as well. The only
-
methods you have to define are begin commit
and
-
commit, and I'm gonna cover those later. Besides
that,
-
you just provide whatever you want. And as
an
-
API to interactive to state. Because next
component is
-
going to be the state updater.
-
The state updater takes in the, requires you
to
-
define the method update_state, which is going
to pass
-
in the instance of your state and is gonna
-
pass in the input tuple and an optional, like,
-
it also gives you an output string in case
-
you want your state to emit more tuples. But,
-
in our case, we're gonna, all we're gonna
do
-
is ex- get the message out of the tuple
-
and call persist awesomely, and that's it
for now.
-
Except, cause it's Java, it's also a factory.
But,
-
let's not mention that. So here we have, I
-
just wanted to add the state to the topology
-
that we're defining. And we're just adding,
all I
-
did was replace the logger one, and instead
of
-
logging, I'm calling partition persist, passing
in the factory.
-
I'm saying, I'm expecting. Now I'll just build
the
-
state. But I'm expecting a tuple that has
the
-
field message, and I'm passing use the basic
updater.
-
And that's it for getting a very basic topology
-
running. So the next step is let's go back
-
to our initial, like, Twitter hash tag example.
This
-
is somewhat what, like, this might be what
it
-
might look like. Ah, many mights. This is
what
-
it could look like as a Storm topology. You'd
-
start with a Tweet spout, however you decide
to
-
get your Tweets in. Via Redis, via directly
Twitter
-
API, whatever. You pass it, you'd pass it
to
-
a transform, which the entire, which its only
goal
-
is to get the Tweet body, get out the
-
hash tags and output them as tuples. The next
-
step is going to be an aggregate, and what
-
that's going to do is it's going to get
-
all the, it's going to get all the hash
-
tags and track how many counts there are.
And
-
then it's gonna send that to the state, and
-
the state is going to do the moving average
-
calculation.
-
This is what it might look like. Again, pretty
-
basic. Extract hash tags inherits from base
function. We
-
define execute and what it's gonna do is get
-
the Tweet body out of the Tuple, extract hash
-
tags. I believe it's the same code. And loop
-
over it and just emit new tuples.
-
Next, the aggregator. So the first thing it
does
-
is just init, so, the. Oh, yeah. First the,
-
an aggregate function is basically just like
Ruby's in,
-
inject on enumerable. So you pass it an initial
-
state and then it's going to loop over whatever
-
it is and pass, pass you the. So the
-
aggregate is the iteration. It's going to
pass in
-
the state that it's building up to as well
-
as each Tuple, and again, an optional output
stream
-
that you could output Tuples in mid-iteration.
-
And the init method, you return the initial
state.
-
Finally there's going to be an extra complete
method,
-
which is oh, we are done aggregating, so let
-
us then finally output our aggregation as
tuples to
-
the stream. And, in this case, just going
to
-
loop over our, our summary, which is a hash
-
map with a hash tag and a count. I'm
-
gonna output tuples that contain the hash
map, the
-
hash tag and the count.
-
And hook it all up. Pretty similar. The main
-
things with aggregates, aggregates functions
or aggregate transforms, you
-
want to call partition aggregate. And I will
cover
-
that in a bit. And also we're going to,
-
at the same time, add our trending topic state.
-
So this one might be implemented, let's again,
we
-
inherit from, OK, at the very all I do
-
is trending topic state inherits from state.
Exactly the
-
same as previously.
-
I'm going to implement begin commit and commit,
for
-
now. And I'm gonna leave them empty and come
-
back to it. And the update method is going
-
to take the hash tag and the count. And
-
it's going to do the exact same work that
-
it did in the Sidekiq one, for now. So
-
it's going to find an existing hash tag record
-
by name and update the moving average and
save
-
it again. And the updater is going to take
-
in all the tuples and just pass it to
-
the states.
-
So you might be wondering, well, I thought
streams
-
were unbounded sequences of tuples. So if
that is
-
the case, when does one call the complete
method?
-
Infinite time from now? I don't know. But
no.
-
What, the way Storm executes its topology
is it
-
executes in batches. So the way it works is
-
gonna be the Storm coordinator, is going to,
here
-
we go. Storm coordinator is going to tell
your
-
spout, yo spout. I'm gonna just read what
it
-
says. Yo spout, start batch id 123.
-
And the spout is going to then be like,
-
OK. Cool. We're starting batch 123. Let's
get some
-
data for it. I fetched two hundred tuples
from
-
whatever source. Like, I, I popped two hundred
messages
-
off of Redis. Seems good. And this is gonna
-
then go through everything we just saw, send
the
-
messages down. It's gonna go to the hash tag
-
function, which will then go to the aggregate
function,
-
which then go to states. The the states gonna
-
be, persist the result and send it back to
-
Storm.
-
So since an I can tell, we can tell
-
Storm, we completed batch 123. So what's,
so what
-
it means is, after, whenever there's a, a
part
-
of the API that appears to be time-based,
like
-
begin commit, commit, and in our case, the
complete
-
or the aggregate function, that's gonna be
called after
-
every single batch. So what, when we say aggregate
-
all, like, aggregate all the counts of the
hash
-
tags, we're really just doing it for a batch.
-
Oh yeah. And that it's going to start the
-
next batch. This just says start batch 124.
-
So is it web scale yet? Well, this is
-
obviously always the answer you have to ask
after
-
every single thing. Every single hour of work.
-
Well, this is what I've got so far. This
-
is what I told you was happening. We've got
-
a Tweet spout. It's sending data down a stream,
-
a single stream, into, to the extract hash
tags.
-
So we have one stream and, well, can a
-
single stream handle fifty thousand Tweets
per second? Well,
-
let's break it down for a bit.
-
This is the most basic topology. This is what
-
I said happened. This is what conceptually
it happens.
-
But, the way it executes is, a stream actually
-
has many partitions. So the number of partitions
is
-
configurable. But every single partition can,
is completely independent.
-
So it can run on completely different nodes
in
-
your Storm cluster. So if you have eight different
-
partitions, you're going, you have the opportunity
of read
-
both from the spout, read from eight different
servers
-
sending it, like, transforming and persisting
on eight different
-
servers. You could have up to, you can have
-
like a thousand different partitions if that's
how far
-
you need to go.
-
But the main point is that you can split
-
up the work load even though you have one
-
conceptual stream, you can split it up across
many
-
partitions and thus servers and threads. So
let's kind
-
of try to work through, again, the hash tag
-
thing through how it might work with partitions.
We
-
had the spout. That says RailsConf. And it
comes
-
out of the spout in one partition, and because
-
we haven't said anything yet, it's just gonna
say,
-
oh, we're just gonna send it to the aggregate
-
transform in the same partition. And it's
going to
-
go to the state as well on the same
-
partition.
-
Here's another bottom left, like, on the left
side,
-
another RailsConf tag. It's going to, it's
in a
-
completely different partition. So possibly
a different server. It's
-
going to go the aggregate in a completely
different
-
server, and ideally what we'd want is for,
at
-
the very end, we can do the moving average
-
calculation on the same server. So ideally
it would
-
end up on the same state partition.
-
But if we have another, like this is just
-
hash tag sleep, something I would like to
do,
-
it would stay on its own partition and it
-
would not, like, it doesn't need to be on
-
the, run on the same server that processes
the
-
RailsConf hash tag.
-
So in, but the one thing's like that, one
-
move from the aggregate middle partition up
to a
-
combining into one end point state partition,
we haven't
-
defined yet.
-
The way you do that is, at any point,
-
you can be, oh, at this point, I want
-
to partition my tuples by hash tag. And when
-
we say that, what we're saying is we're specifying
-
a field on the tuple, and we're saying, hey
-
storm, we would like, once we cross this point,
-
we would like any tuple that has the same
-
hash tag value to end up in the same
-
partition. So this, at this point, like, up
to
-
now, storm didn't, could actually run the
entire topology
-
and same partition on one server.
-
There was no need to, like, it might, I
-
mean you could specify, make it do that, but
-
there is actually no need to process, to have
-
a Tweet hit the network, basically. To go
to
-
a different node. The moment you add a partition,
-
like statement, you're saying, OK, yes, at
this point,
-
we're going to have to make sure that all
-
hash tags on the same value end up on
-
the same server. So it's going to, at this
-
point, try and, like, make sure to do that,
-
and thus may hit the network for it.
-
So how many partitions should we run? Well,
this
-
is very, very use-case specific. You could
run eight.
-
You could run five-hundred twelve. You, it,
it actually
-
depends on your use case. But in order to
-
answer this better, we need to talk a bit
-
about how Storm schedules partitions on the
cluster. So
-
first of all, there are many servers. And
you
-
have a cluster, you have many servers distributed,
yay.
-
So let's say, I don't know, three servers,
and
-
each server could have many threads per server,
because
-
it's very, very concurrent.
-
So you might have, I don't know, thirty, forty,
-
you, configurable amount of servers, threads
per the server.
-
And there can be many partitions per thread,
as
-
well. So every, a thread in Storm is considered
-
an executer. And an executer can have an arbitrary
-
number of partitions. So when you start up
your
-
cluster, like let's say you've got, you want
to
-
boot up a Storm cluster that is three nodes,
-
you have nine partitions, three threads. So
it works
-
out pretty nice. This is how it might, like,
-
start initially.
-
So you boot up and you have, like, anyway.
-
So it's going to assign three partitions per
thread,
-
per server. So that, the nice thing of dealing
-
with partitions, dealing with partitions is
actually a, something
-
that basically everything that ends up being
pretty distributed
-
does. Like, Reaq, Cassandra, Kafka, all these
things go
-
via partitions, and, because it's very nice.
But little
-
tangent.
-
So, what happens is, when you lose a server,
-
oh. We don't have these executers anymore.
We have
-
all these partitions, they need to be handled
still.
-
Storm is going to redistribute them across
existing servers.
-
So it's gonna be like, OK, all these, all
-
these partitions which represent work, work
load, are gonna
-
be moved to the other servers.
-
And, inversely, that's a little, on the top
is
-
a little server box, and I'm saying, ah, we're
-
adding a server to our cluster. And then we're
-
gonna rebalance it, and Storm is going to
take
-
the existing partitions and redistribute,
redistribute it across the
-
available servers.
-
Probably more evenly than my little graph
says, talks
-
ab- shows. OK.
-
It's
time for some real talk. Failure.
-
So, the question is not will there be failure.
-
The question is, how do we handle it? Cause,
-
if you're building a distributed system, at
some point
-
there is going to be fail, there is going
-
to be something that fails. And you can either
-
close your eyes and pretend it doesn't happen
or
-
you can think through the problems, like how
will
-
we recover? Is the system going to be in
-
an inconsistent state afterwards? Is, are
we gonna lose
-
availability during this time?
-
Handling failure properly is probably the
hardest part of
-
building any distributed system. But it's
not like you
-
have to use really fancy technologies to build
a
-
distributed system. Like if you build a plain-old
Rails
-
app to build a distributed system, you have
a,
-
the browser talks to the Rails app. And all
-
these problems that exist, maybe we don't
think about
-
it as much because we're OK with higher error
-
rates and inconsistencies. But, let's think,
like, even a
-
simple case like a signup form, like the user
-
fills up the signup form and hits submit and
-
it errors out, what happened, right? Did the,
did
-
it fail reaching our server? Did it reach
our
-
server and we started processing it and, but
we
-
had an exception? Did we, like, actually create
the
-
account in the database and the next time
the
-
user is going to refresh the page it'll add,
-
like, I'd like, I'd like to really signup.
I'm
-
gonna try to signup again.
-
And it's gonna be, oh no, you can't signup.
-
Your email is taken. Well, OK. How do we
-
handle all this? And there are ways. I'm just
-
kind of bringing up this simple case, which
I'm
-
sure everybody here has properly protected
their Rails app
-
against, because nobody in this room has ever
had
-
this bug happen.
-
But, I bring it up because this is a
-
relatively simple case, and it really just
gets more
-
complicated from here. So, for example, in
Storm, the
-
whole, like, it's fine. The whole point is
that,
-
you may have one input tuple. Like, one message
-
might be popped up from the queue, and as
-
you process it, you're going to end up generating
-
more tuples. Like, for example, we are going
to,
-
from one Tweet, we are going to extract many
-
hash tag messages. And if you, as you build
-
more and more complex topologies, this is
just going
-
to like, this can, like, fan out.
-
So what happens if, like, this one dies? So
-
everything else, like, succeeded, but that
one, and all
-
of its children, didn't. How do we, then,
what
-
do we do? Do we try to retry just
-
that one tuple? If we do that, we then
-
have to, well, we have to make, track, that
-
is that specific one that failed, that's gonna
involve
-
a lot of bookkeeping. The, and what happens
if
-
tuples have multiple parents? And one parent
ended up
-
succeeding but not the other?
-
And that's possible, with Storm. You can have
tuples
-
that have multiple parents because you can
have joins.
-
But how does one process it? Do you retry
-
the entire batch? It's hard.
-
I will get to how you handle it. But,
-
and the details of how Storm does it internally
-
are really interesting, and I would recommend
you go
-
and read the Wiki page on it, but I
-
don't have time to talk about it. So, going
-
back to the original Sidekiq thing, what happens
if
-
we're, like, halfway through the iteration
and we've persisted
-
some of them and, we, it just explodes halfway
-
through? Well, do we retry the message? Do
we,
-
I don't know. What do? What now?
-
Generally, we have to think about, well, when
we're
-
using a system like Sidekiq or Storm, what
are
-
the message processing guarantees? Because
it's going to change
-
how you work with it. Generally, the two,
the
-
two main ones are, are messages processed
at least
-
once, or are messages processed at most once?
And
-
you can only get one, not the other. Like
-
you can, I mean, yes, you can get exactly
-
one, see what I mean. If anyone says, ah,
-
we guarantee that a message is going to be
-
processed exactly once, they're lying.
-
It's impossible.
-
So to que, what we can try to do
-
is, well, Storm is at least once. But what
-
we can try to do is handle the case,
-
is like, can we handle, if we get messages
-
that show up multiple times, can we handle
that?
-
Can we maybe not redo the processing? Can
we,
-
like, can we try to ensure that we end
-
up with what, as close to as possible, as
-
close as possible to processing exactly once?
-
And the way Storm tries to approach this is
-
really pretty nice. Like I said, we have batches.
-
Everything's processed in batches. And what
Storm will guarantee
-
is that a batch, like the next batch will
-
not complete, like, will not go, reach completion
until
-
the previous one is fully committed. And it
will
-
also give monotonically increasing batch ids.
-
So it will guarantee that the next batch id
-
will have a bigger id than the previous one.
-
So we can combine this, and I will just
-
Tweet my slides and it will be easier to
-
read, but the main this is, I'm now introducing,
-
in the beginning, commit and running out of
time
-
so I'm gonna try to get through this.
-
But the main thing, if we, in the beginning
-
commit, we get the transaction id, what we
can
-
do is store it off, and what, and load
-
up the state and we will talk about that
-
later. But the important thing I want to reach
-
is, talk about it is here, in the update
-
function, we don't actually write to the database
anymore.
-
We just store that in memory. And the main
-
line is return if hash tag last transaction
id
-
equals transaction id. What that means is
we are
-
able to then see, OK, since we know the
-
guarantees, if for some reason a message failed
and
-
we reprocess the batch, we, and we know, we
-
can know that this hash tag actually passed
a
-
certain point, like passed that point, so
we don't
-
need to recompute the moving average on it.
-
And get as close as possible to exactly once.
-
So then, we store all this memory, and once
-
the commit happens, which happens at the end
of
-
the batch, we write all of this, we do
-
basically, we do all the work that hits the
-
outside world, which is right to the database.
We
-
do all that there. And if we reach completion,
-
the batch will complete successfully, and
we know, OK,
-
all the messages have been successfully processed.
But if
-
it fails during the persist, half of the them
-
will have the new transaction id and half
of
-
them will have the old transaction id, but
when
-
the batch gets replayed and we reload that,
we
-
will see, ah, this message already finished
this batch.
-
So to roll through this real quick, I know
-
it's, I believe I'm getting really close or
past
-
my time. But, so, yeah, let's get data. What
-
happens? Like, I fetch two hundred tuples.
Boom. We
-
go through. Aggregate. And it explodes.
-
Eventually, the Storm coordinator's gonna
be like, up above,
-
it says, guys, I did not get a completion
-
message. All right, fine. Spout, let, we're
going to
-
restart batch 123, because I did not get completion.
-
So the Tweet spout's gonna be like, OK, I
-
got this. I'm gonna re-emit exactly the same
two
-
hundred tuples in the same order that I did
-
before.
-
So what that means is if we successfully extract,
-
like, if we successfully implemented our transforms
to be
-
purely functional, as in completely deterministic
based off the
-
input, as we go through, we're gonna get exactly
-
the same outputs. Such that, once we get to
-
the state, I'd, in theory everything's going
to be
-
exactly the same, such that when we actually
run
-
the persist, we know that if something we
successfully
-
persisted, and we get the same transaction
id, we're
-
gonna be like, well, it's going to be exactly
-
the same output.
-
All right. So the main catch is, of course,
-
transforms must be purely functional, cause
otherwise this won't
-
work. That means not ever using time dot now
-
within your transforms. I actually changed
it on the
-
last slide, but I had to, like, zoom through
-
it. The main thing is, again, one, if you
-
have need to get the current time for a
-
batch, one way to do it is to have
-
a spout that only emits the current time.
And
-
every, if the batch gets re-emitted, it just
knows,
-
it saves off somewhere. For this batch id,
I
-
outputted this time.
-
And the next thing is that, it's going to
-
require the spouts to re-emit exactly identical
batches. Which,
-
unfortunately, I think most spouts do not.
I mean,
-
most, like Redis, does not support. So in
order
-
to get this level of guarantees, the only
queue
-
is Kafka. And I was, would like to talk
-
more about Kafka and why it's so awesome,
but
-
we can talk about that after. In general,
even
-
if you're not using Storm, Kafka is a really
-
amazing tool that I highly recommend.
-
So, TL;DR. Storm is a really powerful platform
for
-
writing workers. It's really great for stateful
jobs, and
-
by stateful jobs, I mean jobs that depend
on
-
the result of previous ones. It's good for
complex
-
data processing flows. And that's it. I think.
I
-
don't. My time wasn't up here, so I'm guessing
-
this is about right.
-
Am I really early or late? OK. Cool.
-
I didn't have a clock on either so I'm
-
like, guessing. It didn't start, I don't know.
All
-
right. Well, I'm done. Thanks. All right.