< Return to Video

RailsConf 2014 - Supercharge Your Workers with Storm by Carl Lerche

  • 0:17 - 0:18
    CARL LERCHE: My name is Carl Lerche and
  • 0:18 - 0:22
    I'm gonna be talking about Apache Storm.
  • 0:22 - 0:24
    I am shooting for forty-minutes.
  • 0:24 - 0:26
    It's going to be a long talk.
  • 0:26 - 0:27
    I'm gonna try to cram everything in.
  • 0:27 - 0:29
    I already got a late start.
  • 0:29 - 0:31
    There probably won't be time for questions.
  • 0:31 - 0:32
    I'm also probably going to gloss over
  • 0:32 - 0:34
    a few things that, like, hand-wave-y things,
  • 0:34 - 0:37
    so if something comes up, you have a question,
  • 0:37 - 0:39
    just send me a Tweet directly
  • 0:39 - 0:41
    and I will respond to it after the talk.
  • 0:41 - 0:44
    Or just come and talk.
  • 0:44 - 0:48
    So I work for Tilda, and our product is
  • 0:48 - 0:50
    Skylight, and we're building a smart profiler
    for your
  • 0:50 - 0:54
    Rails app. And I've actually built the entire
    backend
  • 0:54 - 0:57
    using Storm, so another thing, if you want
    to
  • 0:57 - 0:58
    talk about how that works, how we've, I've
    ended
  • 0:58 - 1:01
    up using Storm and the, in the real world,
  • 1:01 - 1:03
    I guess. We, I'll be happy to talk about
  • 1:03 - 1:04
    it.
  • 1:04 - 1:08
    So what is Storm? Let's start with how it
  • 1:08 - 1:11
    describes itself on the web site. It calls
    itself
  • 1:11 - 1:15
    a distributed real time computation system,
    and that sounds
  • 1:15 - 1:19
    really, really fancy. The first time I thought
    about
  • 1:19 - 1:21
    using it, I was like, oh, let me go
  • 1:21 - 1:23
    look and see what Storm's all about. I saw
  • 1:23 - 1:25
    this, was like, whoa. Step back. I'm not going
  • 1:25 - 1:27
    to. This, this sounds like, this is too serious
  • 1:27 - 1:30
    business for me. I need something simpler.
  • 1:30 - 1:32
    It took me like maybe like another six months
  • 1:32 - 1:33
    of really, OK, I'm gonna take time, I'm gonna
  • 1:33 - 1:36
    learn this, and as I got to know it,
  • 1:36 - 1:38
    it really came to me. Oh, this can be
  • 1:38 - 1:40
    used for many, many other use cases. I'm not
  • 1:40 - 1:44
    going to. I might, at the end, if there's
  • 1:44 - 1:46
    time, talk about some of them, but I'm going
  • 1:46 - 1:49
    to try to get into the nitty gritty sooner
  • 1:49 - 1:50
    than later.
  • 1:50 - 1:52
    So, but for now, I'm just gonna say, Storm
  • 1:52 - 1:56
    is a really powerful worker system. So some
    things
  • 1:56 - 1:59
    about it. It is distributed. It's very, very,
    very
  • 1:59 - 2:01
    distributed, which is a good and a bad thing.
  • 2:01 - 2:04
    One, the good thing is you can, like, spread
  • 2:04 - 2:07
    your load across many servers. The bad thing
    is,
  • 2:07 - 2:09
    your operate, like your ops system is gonna
    look
  • 2:09 - 2:12
    something like this. Which can turn out to
    be
  • 2:12 - 2:15
    somewhat of an operational headache. So, like,
    you'll end
  • 2:15 - 2:17
    up having to run a Zookeeper cluster. You'll
    end
  • 2:17 - 2:20
    up having to run, like, the Storm Nimbus process
  • 2:20 - 2:22
    somewhere, and then there's gonna be a number
    of
  • 2:22 - 2:25
    other Storm coordination processes. There's
    gonna be, like, every
  • 2:25 - 2:26
    server's gonna have the worker process.
  • 2:26 - 2:28
    And then, like, while you're at it, you've
    gone
  • 2:28 - 2:30
    this far, let's throw in a distributed database
    and
  • 2:30 - 2:33
    some distributed queues in there too. But
    the main
  • 2:33 - 2:35
    point is there's going to be an operational
    overhead.
  • 2:35 - 2:38
    So don't just do it for fun. There needs
  • 2:38 - 2:40
    to be a good reason. If you can get
  • 2:40 - 2:42
    the job done on a single server, go for
  • 2:42 - 2:44
    that.
  • 2:44 - 2:48
    It's fault-tolerant, and what I mean by that
    is,
  • 2:48 - 2:51
    it is able to recover and continue making
    progress
  • 2:51 - 2:53
    in the event of a failure. And I know
  • 2:53 - 2:57
    a lot of existing systems claim to, claim
    to
  • 2:57 - 3:01
    do this, but the reality is, pandering faults
    is
  • 3:01 - 3:06
    really, really hard. So it doesn't make it,
    like,
  • 3:06 - 3:09
    seamless. Storm doesn't make it seamless.
    The reality is
  • 3:09 - 3:10
    it can't be seamless. But I think, and I'm
  • 3:10 - 3:12
    going to talk about it later, I think the
  • 3:12 - 3:14
    way it does it is about as easy as
  • 3:14 - 3:16
    it can get. I hope.
  • 3:16 - 3:20
    It's really fast. Very low overhead. Some
    completely useless
  • 3:20 - 3:23
    benchmark numbers. It's been clocked at, like,
    processing over
  • 3:23 - 3:26
    a million messages per second on a single
    server.
  • 3:26 - 3:30
    Completely useless number but it sounds good.
  • 3:30 - 3:34
    Supposedly it's language agnostic. And by
    that I mean,
  • 3:34 - 3:36
    supposedly you can use absolutely any language
    at all
  • 3:36 - 3:41
    to build your data processing pipeline. There
    are examples,
  • 3:41 - 3:45
    on the website, where they use bash to, I
  • 3:45 - 3:48
    don't know why, it's probably a novelty. If
    you're
  • 3:48 - 3:51
    distributed bash sounds good. But I guess
    that means
  • 3:51 - 3:55
    you can probably use MRI. However, I've personally
    only
  • 3:55 - 3:58
    used it on the JVM. It's a JVM-based project.
  • 3:58 - 4:00
    It's written in Java and Clojure. It uses
    many
  • 4:00 - 4:04
    Java libraries. I would just recommend, if
    you're gonna
  • 4:04 - 4:06
    try it, try it out on the JVM.
  • 4:06 - 4:08
    And the good news is, we have JRuby, which
  • 4:08 - 4:11
    has done an extremely good job at integrating
    with
  • 4:11 - 4:15
    Java. So you can use these Java libraries,
    and
  • 4:15 - 4:17
    you're just writing Ruby. Some of the things
    I
  • 4:17 - 4:20
    want to handwave also is exactly how to get,
  • 4:20 - 4:22
    like, all the details of getting it running
    on
  • 4:22 - 4:25
    JRuby. But there is a GitHub project called
    redstorm.
  • 4:25 - 4:29
    You could probably just Google GitHub Redstorm.
    I've realized
  • 4:29 - 4:31
    I didn't actually include any links. But,
    again, I
  • 4:31 - 4:33
    can find them later.
  • 4:33 - 4:36
    So it does a JRuby binding to storm. It
  • 4:36 - 4:38
    also goes a bit further. It provides, like,
    Ruby
  • 4:38 - 4:41
    DSLs for everything related to storm. So you
    can
  • 4:41 - 4:43
    define all your data transformations in terms
    of Ruby
  • 4:43 - 4:45
    DSLs.
  • 4:45 - 4:48
    So the way I'm gonna kind of go over
  • 4:48 - 4:50
    storm is I'm going to build up a straw
  • 4:50 - 4:51
    man. That straw man is going to be, how
  • 4:51 - 4:54
    would one implement Twitter trending topics.
    You know, it's,
  • 4:54 - 4:57
    I think I, we all hopefully know about what
  • 4:57 - 5:01
    a trending topic is. That's kind of why I
  • 5:01 - 5:05
    picked it. It's. But. Essentially, everybody
    Tweets and they
  • 5:05 - 5:08
    can include these hash tags. Hash tag RailsConf.
    Hash
  • 5:08 - 5:12
    tag whatever. And as the rates of single hash
  • 5:12 - 5:15
    tags increase, like the highest, the most,
    the hash
  • 5:15 - 5:17
    tags that occur at the highest rates end up
  • 5:17 - 5:19
    getting bubbled up, and Twitter says, ah,
    these are
  • 5:19 - 5:21
    the topics that are trending so that everybody
    can
  • 5:21 - 5:21
    see what's going on.
  • 5:21 - 5:24
    So, like, around here, you might see, RailsConf
    is
  • 5:24 - 5:26
    trending. Another good reason, a reason why
    I picked
  • 5:26 - 5:30
    it is it's very time-sensitive, and times
    into, ties
  • 5:30 - 5:32
    into the real time aspect of Storm.
  • 5:32 - 5:34
    All right. So to implement this, the way I'm
  • 5:34 - 5:38
    going to calculate the rate, like how often
    a
  • 5:38 - 5:40
    Tweet is happening is by using an exponentially
    weighted
  • 5:40 - 5:44
    moving average. Fancy. Basically what it means
    is like,
  • 5:44 - 5:47
    as we're moving up, going, like, processing
    the Tweets,
  • 5:47 - 5:48
    what I'm gonna do is I'm gonna count the
  • 5:48 - 5:50
    number of occurrences of each hash tag, for
    like,
  • 5:50 - 5:53
    every five seconds. The interval doesn't mat,
    really matter.
  • 5:53 - 5:55
    Then we're gonna average them up.
  • 5:55 - 5:58
    And, but instead of doing a normal average,
    which
  • 5:58 - 6:00
    is sum up all the values divided by the
  • 6:00 - 6:02
    number of values, we're going to do a weighted
  • 6:02 - 6:05
    average, which is going to cause the older,
    like
  • 6:05 - 6:10
    the older occurrences, to drop in weights
    exponentially. This
  • 6:10 - 6:13
    is how Linux does their one minute, five minute,
  • 6:13 - 6:14
    and fifteen minute load averages.
  • 6:14 - 6:17
    It's a pretty nice way of smoothing out the
  • 6:17 - 6:19
    curves, and you can tune how you want the
  • 6:19 - 6:23
    curves to end up getting smoothed. So first
    is
  • 6:23 - 6:26
    kind of build up some context. Let's kind
    of
  • 6:26 - 6:28
    look at how one might implement this with
    Sidekiq
  • 6:28 - 6:32
    or, like, Resque or whatever. Any other, like,
    kind
  • 6:32 - 6:35
    of simpler system that just pops off of a
  • 6:35 - 6:36
    queue with a worker.
  • 6:36 - 6:37
    This is what it might look like. You've got
  • 6:37 - 6:40
    your Rails app. Pushes data into Redis and
    then
  • 6:40 - 6:42
    each, like, the worker says, OK, I'm gonna
    pop
  • 6:42 - 6:45
    a message off of Redis. I'm going to read
  • 6:45 - 6:47
    straight from the database. I'm gonna process
    the message.
  • 6:47 - 6:49
    I'm gonna write straight back out from the
    database.
  • 6:49 - 6:51
    And then the Rails app can just query the
  • 6:51 - 6:55
    result by reading it straight from the database.
    Here's
  • 6:55 - 6:56
    a, it might run, I did not. I, is
  • 6:56 - 7:00
    that bright enough? Hopefully it is. The comment
    basically
  • 7:00 - 7:02
    says, yes, this is very naive. I know it.
  • 7:02 - 7:05
    But that's not the point. Don't say, oh you
  • 7:05 - 7:08
    could do this one million times faster. That's
    not
  • 7:08 - 7:09
    the point.
  • 7:09 - 7:11
    Hopefully it's readable.
  • 7:11 - 7:13
    So tags. So the basic premise is you have
  • 7:13 - 7:15
    an entry method, which takes in a Tweet as
  • 7:15 - 7:19
    an argument, and it, first step is to extract
  • 7:19 - 7:21
    the hash tags from the body. So I get,
  • 7:21 - 7:23
    like, an array of tags, and I loop over
  • 7:23 - 7:25
    each tag and I read from the database, is
  • 7:25 - 7:28
    there already an existing record? If so I'm
    going
  • 7:28 - 7:31
    to update the moving average for it and then
  • 7:31 - 7:32
    save it back.
  • 7:32 - 7:38
    Oh. Yeah. Put highlights. So this is how one
  • 7:38 - 7:41
    might. It doesn't really matter. It was kind
    of
  • 7:41 - 7:43
    like for informational purposes. But this
    is how one
  • 7:43 - 7:48
    might implement the moving average like function.
    So the,
  • 7:48 - 7:51
    the most important part that I want to call,
  • 7:51 - 7:54
    like, draw attention to is that in the update
  • 7:54 - 7:58
    EWMA, the very first thing we do is catch,
  • 7:58 - 8:02
    catch p. And I didn't pass now. But in
  • 8:02 - 8:03
    theory it should pass now down to the catch
  • 8:03 - 8:07
    up and it will keep ticking to update the
  • 8:07 - 8:09
    rate for every bucket that we decided we're
    smoothing
  • 8:09 - 8:12
    by. So, like, every five seconds it's going
    to
  • 8:12 - 8:15
    reupdate the int, the rate so that it will
  • 8:15 - 8:19
    downweight previous values and go on from
    there.
  • 8:19 - 8:22
    So this is going to expose a problem with
  • 8:22 - 8:24
    our first version of the worker, in that if
  • 8:24 - 8:27
    we do not receive any Tweets with a given
  • 8:27 - 8:32
    hash tag, what, how do we handle that? We
  • 8:32 - 8:34
    need to still update the record in order to
  • 8:34 - 8:37
    constantly update the rates so that, ah, we're
    not
  • 8:37 - 8:40
    resaving any data. We have to start re, lowering
  • 8:40 - 8:43
    the weight of that hashtag.
  • 8:43 - 8:45
    So, just, we'll have to end up implementing
    something
  • 8:45 - 8:48
    like this. Which is going to be scheduled
    via
  • 8:48 - 8:51
    Chrone or just regular, some regularly scheduled
    task that
  • 8:51 - 8:55
    probably has to run every bucket interval.
    And first
  • 8:55 - 8:58
    step is just delete all the existing tags
    that
  • 8:58 - 9:00
    are way too old just to clear the database.
  • 9:00 - 9:03
    And then load up all the tags and update
  • 9:03 - 9:07
    the rates for them. Make sure they're caught
    up.
  • 9:07 - 9:09
    So this is going to, one thing to point
  • 9:09 - 9:11
    out is, OK, so we now have to have
  • 9:11 - 9:14
    a task that is going to read literally every
  • 9:14 - 9:17
    record from the database every tick. And there
    probably
  • 9:17 - 9:18
    is a better way to do this. But that's
  • 9:18 - 9:20
    not the point.
  • 9:20 - 9:23
    So is it web scale, yet? Well, let's look.
  • 9:23 - 9:26
    So far, not yet. But what we can do
  • 9:26 - 9:28
    is we can start scaling out our workers. We're
  • 9:28 - 9:31
    just gonna like, ah, let's spawn up three
    workers.
  • 9:31 - 9:34
    Now we're ready to handle fifty-thousand Tweets
    per second.
  • 9:34 - 9:37
    Well, maybe not, but we have more tricks up
  • 9:37 - 9:39
    our sleeve. Like, we know, like I mentioned,
    we
  • 9:39 - 9:41
    know that the database is going to be a
  • 9:41 - 9:43
    pretty big bottleneck, cause every single
    Tweet we get
  • 9:43 - 9:44
    in, it's going to have to read state, process
  • 9:44 - 9:46
    it, and write it back.
  • 9:46 - 9:49
    What if we could optimize the, and, remove
    the
  • 9:49 - 9:53
    read state and, by caching a memory. Super
    simple
  • 9:53 - 9:57
    implementation, which more is represented
    as like, oh, unless
  • 9:57 - 9:59
    like, we basically build an in memory cache
    of
  • 9:59 - 10:02
    hash tags in the process, such that every
    time
  • 10:02 - 10:03
    we get a Tweet, it comes in and we
  • 10:03 - 10:06
    first check our cache before trying to, making
    a
  • 10:06 - 10:08
    new one. And we update that and save it
  • 10:08 - 10:09
    back to the database.
  • 10:09 - 10:11
    So let's run through this and see what's happening.
  • 10:11 - 10:14
    Here's the first Tweet that comes, that comes
    through.
  • 10:14 - 10:17
    We include, queue up, it comes in through
    the
  • 10:17 - 10:22
    queue. Fancy animations, yeah. And it goes
    to the
  • 10:22 - 10:24
    first worker. The worker processes it. It's
    like, OK,
  • 10:24 - 10:25
    my cache is empty. I'm gonna check in the
  • 10:25 - 10:28
    database. Nothing for RailsConf in the database.
    So I'm
  • 10:28 - 10:31
    just gonna process that memory the count one.
    And
  • 10:31 - 10:33
    we're gonna write it to the database. K.
  • 10:33 - 10:35
    Next Tweet comes in. Ah, what's for lunch
    at
  • 10:35 - 10:38
    RailsConf? Well we know now, but it's going
    to
  • 10:38 - 10:40
    be, like, queued, like, again, queued up in
    here,
  • 10:40 - 10:41
    and I'm sure you know where this is going.
  • 10:41 - 10:44
    Goes to the second worker. Second worker reads
    the
  • 10:44 - 10:47
    counts and it works, increments count to two
    and
  • 10:47 - 10:50
    it goes back and writes the database.
  • 10:50 - 10:52
    And here's the third Tweet. And we're gonna
    get
  • 10:52 - 10:54
    to it. This one's going to go back to
  • 10:54 - 10:56
    the first worker, it's like, oh, I have everything
  • 10:56 - 10:58
    cached. I don't need to read from the database.
  • 10:58 - 11:03
    No probably. We'll count to two and yes. Yes.
  • 11:03 - 11:06
    This. I did this way too late. I was
  • 11:06 - 11:08
    like, ah, I'm just gonna do fancy animations.
    All
  • 11:08 - 11:09
    right, so, caching in this kind of system
    is
  • 11:09 - 11:12
    not obvious. We have this problem of, like,
    we
  • 11:12 - 11:16
    have, we can still make it work. There's many
  • 11:16 - 11:18
    things we could do. Like, we could push, like,
  • 11:18 - 11:20
    we could push for cache external. We could
    push
  • 11:20 - 11:21
    for mem cache, but now we still have an
  • 11:21 - 11:26
    external process that is required for coordinating
    the work.
  • 11:26 - 11:29
    So, the main thing is, even though we have
  • 11:29 - 11:31
    many worker jobs, there is still a high amount
  • 11:31 - 11:35
    of coordination that's required to get this
    to work.
  • 11:35 - 11:36
    K.
  • 11:36 - 11:39
    Enter Storm. And, as you can guess, probably
    this
  • 11:39 - 11:41
    is gonna, like, all these problems are gonna
    magically
  • 11:41 - 11:44
    go away when we use storm. I wish. But
  • 11:44 - 11:47
    it'll, there's something we can do. Let's
    start with
  • 11:47 - 11:52
    some abstractions. The core abstractions in
    Storm are the
  • 11:52 - 11:56
    streams and the tuples. A stream is, essentially,
    a
  • 11:56 - 12:00
    series of tubes through which data flows.
    And, but
  • 12:00 - 12:03
    they call them streams. I would, tubes would
    have
  • 12:03 - 12:04
    been better.
  • 12:04 - 12:07
    And tuples are just, it's just like a list
  • 12:07 - 12:10
    of values. And these values can be anything.
    They're
  • 12:10 - 12:11
    really just the messages. So you can put in
  • 12:11 - 12:13
    tuples, you can put your strings, you can
    put
  • 12:13 - 12:15
    integers, you can put any object you want
    as
  • 12:15 - 12:18
    long as you can serialize it.
  • 12:18 - 12:20
    And you, you're allowed to specify custom
    serialization. So
  • 12:20 - 12:23
    as long as you can serialize it to JSON,
  • 12:23 - 12:25
    serialize it to any serialization format,
    you can put
  • 12:25 - 12:28
    that object in the tuple.
  • 12:28 - 12:29
    The rest of Storm is going to be a
  • 12:29 - 12:32
    series, a bunch of primitives to take these
    streams
  • 12:32 - 12:35
    and tuples and to transform the data from,
    like,
  • 12:35 - 12:39
    one representation to the other.
  • 12:39 - 12:43
    Next, spouts and states. So the spouts are
    the
  • 12:43 - 12:45
    source of the streams. So this is how you
  • 12:45 - 12:48
    get data into storm. This is the starting
    point
  • 12:48 - 12:52
    of the streams. So anything that you want
    to
  • 12:52 - 12:54
    read from the outside world is going to end
  • 12:54 - 12:56
    up being represented as a spout. And this
    can
  • 12:56 - 13:00
    be reading from redis, reading from SQS, reading
    from
  • 13:00 - 13:03
    anything. But it can also be, you could implement
  • 13:03 - 13:06
    a spout that reads directly from the Twitter
    API.
  • 13:06 - 13:09
    You could implement a spout that reads from
    the
  • 13:09 - 13:13
    database, that makes HTTP requests, that gets
    the time
  • 13:13 - 13:16
    of the day, even. And, yes, you will want
  • 13:16 - 13:18
    to actually implement a spout that gets current
    time
  • 13:18 - 13:20
    of the day. And I'll talk about that later.
  • 13:20 - 13:25
    States are the opposite. State is how you
    get
  • 13:25 - 13:28
    the result of the transformations outside
    of Storm. So
  • 13:28 - 13:31
    exactly the opposite. If you want to write
    outside
  • 13:31 - 13:32
    of storm, like you write to the database,
    write,
  • 13:32 - 13:36
    make HTTP post requests, send email, push
    to external
  • 13:36 - 13:40
    queues, anything like that. It's going to
    be implemented
  • 13:40 - 13:44
    as a state. So, so far this is what
  • 13:44 - 13:46
    we know about Storm.
  • 13:46 - 13:48
    We have Spout. It starts a stream. Data is
  • 13:48 - 13:49
    going to flow through it and it's going to
  • 13:49 - 13:54
    end up at the state. So far nothing is
  • 13:54 - 13:55
    super interesting.
  • 13:55 - 13:58
    Transform. The rest is really about how do
    we
  • 13:58 - 14:02
    transform the data as it flows through. And
    transforms
  • 14:02 - 14:05
    are gonna be purely functional operations
    on the data.
  • 14:05 - 14:09
    So they are going to be given the, given,
  • 14:09 - 14:12
    given inputs, the same, it's, they're going
    to output
  • 14:12 - 14:15
    the same values. Anyway.
  • 14:15 - 14:18
    So let's add a couple transforms here. So
    we
  • 14:18 - 14:20
    have a spout. It's going to send some data
  • 14:20 - 14:22
    and we can, like, filter if we need to.
  • 14:22 - 14:24
    So we'll filter some data out and we'll aggregate
  • 14:24 - 14:26
    it. And the data will flow through and end
  • 14:26 - 14:30
    up at the state and then get persisted.
  • 14:30 - 14:33
    And then more transforms. Basically from here,
    what you
  • 14:33 - 14:35
    end up doing is modeling your data in terms
  • 14:35 - 14:38
    of dataflow to get to the end point. And
  • 14:38 - 14:41
    there's really, you can, there's no limitation
    in terms
  • 14:41 - 14:44
    of, I mean, granted you don't add a million
  • 14:44 - 14:46
    spouts because that will probably be crazy
    and you'll
  • 14:46 - 14:47
    be, not be able to understand the code. But
  • 14:47 - 14:50
    besides that, you can add as many spouts as
  • 14:50 - 14:52
    you want. You can add as many state end
  • 14:52 - 14:55
    points. You can, like, split the streams.
    You can
  • 14:55 - 14:59
    join them. And do whatever real transformation
    that you
  • 14:59 - 15:01
    want. And Storm is going to take care of
  • 15:01 - 15:05
    figuring out how to take this definition,
    like, the
  • 15:05 - 15:08
    state of the definition and distribute it
    across your
  • 15:08 - 15:09
    cluster.
  • 15:09 - 15:13
    And this entire set, basically, wrap of starting
    at
  • 15:13 - 15:17
    stream, starting at spouts and ending at states
    is
  • 15:17 - 15:21
    called the topology. And in a bit, I'm gonna
  • 15:21 - 15:24
    show how to define the topology.
  • 15:24 - 15:28
    So, OK. Let's break it down just a little
  • 15:28 - 15:31
    bit. So the spout is where it starts, and
  • 15:31 - 15:33
    then it's going to emit tuples to the filter.
  • 15:33 - 15:36
    Generally, as you're gonna get the spout,
    like, it's
  • 15:36 - 15:38
    gonna be a pretty standard thing. Like, for
    example,
  • 15:38 - 15:40
    if you are pulling, if you are pulling data
  • 15:40 - 15:42
    from Redis and want, you will use a Redis
  • 15:42 - 15:45
    spout that already exists. And it will literally
    just
  • 15:45 - 15:48
    be in your definition, just use Redis spouts.
    And
  • 15:48 - 15:52
    filters, as well, any, any transforms can
    usually be
  • 15:52 - 15:55
    abstracted up to higher level concepts and
    shared.
  • 15:55 - 15:59
    So, for example, this is how you might implement
  • 15:59 - 16:06
    the my filter, the my filter function or transform
  • 16:06 - 16:09
    for Storm. So the base function is provided
    as
  • 16:09 - 16:13
    part of the Storm API. And the requirement
    is
  • 16:13 - 16:15
    that you define a single method that takes
    the
  • 16:15 - 16:19
    input as a tuple and it will take the
  • 16:19 - 16:22
    output, which represents the stream. So you
    will get
  • 16:22 - 16:24
    in a tuple, you're gonna process it, and you
  • 16:24 - 16:28
    will then emit zero or more output tuples
    onto
  • 16:28 - 16:29
    the output stream.
  • 16:29 - 16:32
    So, the first step in the filter is just
  • 16:32 - 16:34
    get the, this is, get the message out of
  • 16:34 - 16:36
    the tuple, and this is somewhat of a Java-ish
  • 16:36 - 16:41
    API, but it's possible to shim. So, get value
  • 16:41 - 16:43
    by field. You, in this case, we're just creating
  • 16:43 - 16:46
    the tuple more as a hash map than anything
  • 16:46 - 16:48
    else. So we get the message. Is the message
  • 16:48 - 16:51
    awesome? If so, we just output it on the
  • 16:51 - 16:54
    output stream. And that is all that it takes
  • 16:54 - 16:56
    to implement a very simple transform.
  • 16:56 - 17:00
    K. Here's another example. And this is not
    something
  • 17:00 - 17:03
    to run in production but is super helpful
    for
  • 17:03 - 17:07
    debugging. All right, the next step is define
    the
  • 17:07 - 17:10
    topology. Like, define how all the different,
    like, transforms
  • 17:10 - 17:13
    and spouts and everything hook up together.
  • 17:13 - 17:17
    And the important part's on the screen. So
    the
  • 17:17 - 17:21
    define_topology method doesn't matter, it's
    whatever. The main points
  • 17:21 - 17:23
    you will start at, you will get a topology
  • 17:23 - 17:26
    object, and on that you'll use a series of
  • 17:26 - 17:29
    APIs define how things flow together. So you
    start
  • 17:29 - 17:31
    by defining the stream and you just name it
  • 17:31 - 17:34
    as, are my-spout, and in this case, I'm saying
  • 17:34 - 17:37
    MyQueueSpout, but it would be like My, like
    RedisSpout
  • 17:37 - 17:41
    dot new specify the server, the, like, wherever
    the
  • 17:41 - 17:42
    server is and the topic you want to read
  • 17:42 - 17:45
    from. Or, if you're using SQS you would pass
  • 17:45 - 17:49
    in your credentials and the topic you're reading
    from,
  • 17:49 - 17:51
    et cetera.
  • 17:51 - 17:55
    Next step, usually the spouts will just output
    tuples
  • 17:55 - 17:57
    that are raw bytes. So it will just get
  • 17:57 - 17:58
    the raw bytes from Redis and output it, so
  • 17:58 - 18:01
    the next step is to deserialize the raw bytes,
  • 18:01 - 18:03
    so we are getting a message that, we are
  • 18:03 - 18:06
    saying ah, we are expecting in, as an input,
  • 18:06 - 18:08
    a tuple that contains the field bytes. We
    are
  • 18:08 - 18:12
    gonna pass it to the queue message deserializer.
    And
  • 18:12 - 18:14
    the output is gonna be a tuple that contains
  • 18:14 - 18:14
    message.
  • 18:14 - 18:18
    I included the input, an example implementation
    under. And
  • 18:18 - 18:20
    then we're gonna chain that, and we're gonna
    say,
  • 18:20 - 18:22
    like, for each tuple that we get, we're gonna
  • 18:22 - 18:25
    expect, well, we're gonna expect a tuple that
    contains
  • 18:25 - 18:27
    the field message. We're gonna pass it to
    my
  • 18:27 - 18:30
    filter, which will filter it based off of
    our
  • 18:30 - 18:33
    predicates, and we'll output another tuple
    that contains a
  • 18:33 - 18:38
    message. And finally we'll pass it the logger.
  • 18:38 - 18:40
    Next step, run it. And the easiest way to
  • 18:40 - 18:43
    get this running, just for exper, like, playing
    around,
  • 18:43 - 18:47
    is just running it locally. And redstorm does
    this,
  • 18:47 - 18:51
    but if you want to just, like, the actual
  • 18:51 - 18:53
    AP, like, amount of code to get it started
  • 18:53 - 18:55
    is ba- is just this. You initialize a new
  • 18:55 - 18:59
    cluster object. You define new TridentTopology.
    I'm going to
  • 18:59 - 19:01
    just kind of like, wave my hands over what
  • 19:01 - 19:04
    Trident is. You can read the Wiki.
  • 19:04 - 19:07
    And doesn't really matter for the talk. You
    can
  • 19:07 - 19:11
    initialize a config and then there's our define_topology
    method.
  • 19:11 - 19:13
    You pass in topology and then you submit it
  • 19:13 - 19:15
    to the cluster, which is just your locally
    running
  • 19:15 - 19:18
    machine. And the entire thing is running.
    Winning.
  • 19:18 - 19:20
    But only a little bit winning, because we
    have
  • 19:20 - 19:23
    not, we're not doing anything with the data
    yet.
  • 19:23 - 19:25
    So the next step is going to be to
  • 19:25 - 19:29
    persist the results. And, again, there are
    higher-level things
  • 19:29 - 19:32
    that, there are like, there are already existing
    libraries
  • 19:32 - 19:34
    where it's like, oh, you take in these tuples
  • 19:34 - 19:36
    and just throw them, throw them entirely in
    Redis
  • 19:36 - 19:38
    that take no code. But I am going to
  • 19:38 - 19:40
    jump directly down into how you would implement
    a
  • 19:40 - 19:43
    state from scratch.
  • 19:43 - 19:47
    So what it takes is, you just make a
  • 19:47 - 19:51
    class, inherit from state. This is provided
    from, as
  • 19:51 - 19:54
    part of the Storm API as well. The only
  • 19:54 - 19:56
    methods you have to define are begin commit
    and
  • 19:56 - 20:00
    commit, and I'm gonna cover those later. Besides
    that,
  • 20:00 - 20:03
    you just provide whatever you want. And as
    an
  • 20:03 - 20:07
    API to interactive to state. Because next
    component is
  • 20:07 - 20:11
    going to be the state updater.
  • 20:11 - 20:15
    The state updater takes in the, requires you
    to
  • 20:15 - 20:19
    define the method update_state, which is going
    to pass
  • 20:19 - 20:21
    in the instance of your state and is gonna
  • 20:21 - 20:25
    pass in the input tuple and an optional, like,
  • 20:25 - 20:27
    it also gives you an output string in case
  • 20:27 - 20:31
    you want your state to emit more tuples. But,
  • 20:31 - 20:32
    in our case, we're gonna, all we're gonna
    do
  • 20:32 - 20:35
    is ex- get the message out of the tuple
  • 20:35 - 20:38
    and call persist awesomely, and that's it
    for now.
  • 20:38 - 20:43
    Except, cause it's Java, it's also a factory.
    But,
  • 20:43 - 20:48
    let's not mention that. So here we have, I
  • 20:48 - 20:51
    just wanted to add the state to the topology
  • 20:51 - 20:54
    that we're defining. And we're just adding,
    all I
  • 20:54 - 20:56
    did was replace the logger one, and instead
    of
  • 20:56 - 21:01
    logging, I'm calling partition persist, passing
    in the factory.
  • 21:01 - 21:04
    I'm saying, I'm expecting. Now I'll just build
    the
  • 21:04 - 21:08
    state. But I'm expecting a tuple that has
    the
  • 21:08 - 21:13
    field message, and I'm passing use the basic
    updater.
  • 21:13 - 21:18
    And that's it for getting a very basic topology
  • 21:18 - 21:23
    running. So the next step is let's go back
  • 21:23 - 21:30
    to our initial, like, Twitter hash tag example.
    This
  • 21:30 - 21:32
    is somewhat what, like, this might be what
    it
  • 21:32 - 21:34
    might look like. Ah, many mights. This is
    what
  • 21:34 - 21:37
    it could look like as a Storm topology. You'd
  • 21:37 - 21:39
    start with a Tweet spout, however you decide
    to
  • 21:39 - 21:43
    get your Tweets in. Via Redis, via directly
    Twitter
  • 21:43 - 21:46
    API, whatever. You pass it, you'd pass it
    to
  • 21:46 - 21:49
    a transform, which the entire, which its only
    goal
  • 21:49 - 21:52
    is to get the Tweet body, get out the
  • 21:52 - 21:54
    hash tags and output them as tuples. The next
  • 21:54 - 21:56
    step is going to be an aggregate, and what
  • 21:56 - 21:58
    that's going to do is it's going to get
  • 21:58 - 22:01
    all the, it's going to get all the hash
  • 22:01 - 22:04
    tags and track how many counts there are.
    And
  • 22:04 - 22:06
    then it's gonna send that to the state, and
  • 22:06 - 22:09
    the state is going to do the moving average
  • 22:09 - 22:12
    calculation.
  • 22:12 - 22:15
    This is what it might look like. Again, pretty
  • 22:15 - 22:19
    basic. Extract hash tags inherits from base
    function. We
  • 22:19 - 22:21
    define execute and what it's gonna do is get
  • 22:21 - 22:24
    the Tweet body out of the Tuple, extract hash
  • 22:24 - 22:26
    tags. I believe it's the same code. And loop
  • 22:26 - 22:30
    over it and just emit new tuples.
  • 22:30 - 22:34
    Next, the aggregator. So the first thing it
    does
  • 22:34 - 22:37
    is just init, so, the. Oh, yeah. First the,
  • 22:37 - 22:44
    an aggregate function is basically just like
    Ruby's in,
  • 22:44 - 22:47
    inject on enumerable. So you pass it an initial
  • 22:47 - 22:49
    state and then it's going to loop over whatever
  • 22:49 - 22:52
    it is and pass, pass you the. So the
  • 22:52 - 22:54
    aggregate is the iteration. It's going to
    pass in
  • 22:54 - 22:56
    the state that it's building up to as well
  • 22:56 - 22:59
    as each Tuple, and again, an optional output
    stream
  • 22:59 - 23:04
    that you could output Tuples in mid-iteration.
  • 23:04 - 23:09
    And the init method, you return the initial
    state.
  • 23:09 - 23:11
    Finally there's going to be an extra complete
    method,
  • 23:11 - 23:15
    which is oh, we are done aggregating, so let
  • 23:15 - 23:19
    us then finally output our aggregation as
    tuples to
  • 23:19 - 23:22
    the stream. And, in this case, just going
    to
  • 23:22 - 23:24
    loop over our, our summary, which is a hash
  • 23:24 - 23:25
    map with a hash tag and a count. I'm
  • 23:25 - 23:28
    gonna output tuples that contain the hash
    map, the
  • 23:28 - 23:32
    hash tag and the count.
  • 23:32 - 23:36
    And hook it all up. Pretty similar. The main
  • 23:36 - 23:41
    things with aggregates, aggregates functions
    or aggregate transforms, you
  • 23:41 - 23:46
    want to call partition aggregate. And I will
    cover
  • 23:46 - 23:48
    that in a bit. And also we're going to,
  • 23:48 - 23:52
    at the same time, add our trending topic state.
  • 23:52 - 23:55
    So this one might be implemented, let's again,
    we
  • 23:55 - 23:58
    inherit from, OK, at the very all I do
  • 23:58 - 24:01
    is trending topic state inherits from state.
    Exactly the
  • 24:01 - 24:01
    same as previously.
  • 24:01 - 24:05
    I'm going to implement begin commit and commit,
    for
  • 24:05 - 24:08
    now. And I'm gonna leave them empty and come
  • 24:08 - 24:11
    back to it. And the update method is going
  • 24:11 - 24:14
    to take the hash tag and the count. And
  • 24:14 - 24:16
    it's going to do the exact same work that
  • 24:16 - 24:19
    it did in the Sidekiq one, for now. So
  • 24:19 - 24:21
    it's going to find an existing hash tag record
  • 24:21 - 24:25
    by name and update the moving average and
    save
  • 24:25 - 24:28
    it again. And the updater is going to take
  • 24:28 - 24:30
    in all the tuples and just pass it to
  • 24:30 - 24:31
    the states.
  • 24:31 - 24:36
    So you might be wondering, well, I thought
    streams
  • 24:36 - 24:40
    were unbounded sequences of tuples. So if
    that is
  • 24:40 - 24:45
    the case, when does one call the complete
    method?
  • 24:45 - 24:48
    Infinite time from now? I don't know. But
    no.
  • 24:48 - 24:52
    What, the way Storm executes its topology
    is it
  • 24:52 - 24:55
    executes in batches. So the way it works is
  • 24:55 - 24:59
    gonna be the Storm coordinator, is going to,
    here
  • 24:59 - 25:01
    we go. Storm coordinator is going to tell
    your
  • 25:01 - 25:04
    spout, yo spout. I'm gonna just read what
    it
  • 25:04 - 25:08
    says. Yo spout, start batch id 123.
  • 25:08 - 25:11
    And the spout is going to then be like,
  • 25:11 - 25:13
    OK. Cool. We're starting batch 123. Let's
    get some
  • 25:13 - 25:17
    data for it. I fetched two hundred tuples
    from
  • 25:17 - 25:20
    whatever source. Like, I, I popped two hundred
    messages
  • 25:20 - 25:23
    off of Redis. Seems good. And this is gonna
  • 25:23 - 25:24
    then go through everything we just saw, send
    the
  • 25:24 - 25:26
    messages down. It's gonna go to the hash tag
  • 25:26 - 25:28
    function, which will then go to the aggregate
    function,
  • 25:28 - 25:31
    which then go to states. The the states gonna
  • 25:31 - 25:34
    be, persist the result and send it back to
  • 25:34 - 25:35
    Storm.
  • 25:35 - 25:37
    So since an I can tell, we can tell
  • 25:37 - 25:41
    Storm, we completed batch 123. So what's,
    so what
  • 25:41 - 25:46
    it means is, after, whenever there's a, a
    part
  • 25:46 - 25:49
    of the API that appears to be time-based,
    like
  • 25:49 - 25:52
    begin commit, commit, and in our case, the
    complete
  • 25:52 - 25:55
    or the aggregate function, that's gonna be
    called after
  • 25:55 - 25:59
    every single batch. So what, when we say aggregate
  • 25:59 - 26:03
    all, like, aggregate all the counts of the
    hash
  • 26:03 - 26:05
    tags, we're really just doing it for a batch.
  • 26:05 - 26:07
    Oh yeah. And that it's going to start the
  • 26:07 - 26:10
    next batch. This just says start batch 124.
  • 26:10 - 26:17
    So is it web scale yet? Well, this is
  • 26:17 - 26:19
    obviously always the answer you have to ask
    after
  • 26:19 - 26:23
    every single thing. Every single hour of work.
  • 26:23 - 26:25
    Well, this is what I've got so far. This
  • 26:25 - 26:27
    is what I told you was happening. We've got
  • 26:27 - 26:29
    a Tweet spout. It's sending data down a stream,
  • 26:29 - 26:32
    a single stream, into, to the extract hash
    tags.
  • 26:32 - 26:36
    So we have one stream and, well, can a
  • 26:36 - 26:41
    single stream handle fifty thousand Tweets
    per second? Well,
  • 26:41 - 26:43
    let's break it down for a bit.
  • 26:43 - 26:45
    This is the most basic topology. This is what
  • 26:45 - 26:48
    I said happened. This is what conceptually
    it happens.
  • 26:48 - 26:54
    But, the way it executes is, a stream actually
  • 26:54 - 26:57
    has many partitions. So the number of partitions
    is
  • 26:57 - 27:03
    configurable. But every single partition can,
    is completely independent.
  • 27:03 - 27:07
    So it can run on completely different nodes
    in
  • 27:07 - 27:11
    your Storm cluster. So if you have eight different
  • 27:11 - 27:14
    partitions, you're going, you have the opportunity
    of read
  • 27:14 - 27:17
    both from the spout, read from eight different
    servers
  • 27:17 - 27:20
    sending it, like, transforming and persisting
    on eight different
  • 27:20 - 27:22
    servers. You could have up to, you can have
  • 27:22 - 27:25
    like a thousand different partitions if that's
    how far
  • 27:25 - 27:27
    you need to go.
  • 27:27 - 27:30
    But the main point is that you can split
  • 27:30 - 27:32
    up the work load even though you have one
  • 27:32 - 27:35
    conceptual stream, you can split it up across
    many
  • 27:35 - 27:40
    partitions and thus servers and threads. So
    let's kind
  • 27:40 - 27:44
    of try to work through, again, the hash tag
  • 27:44 - 27:48
    thing through how it might work with partitions.
    We
  • 27:48 - 27:52
    had the spout. That says RailsConf. And it
    comes
  • 27:52 - 27:54
    out of the spout in one partition, and because
  • 27:54 - 27:56
    we haven't said anything yet, it's just gonna
    say,
  • 27:56 - 27:58
    oh, we're just gonna send it to the aggregate
  • 27:58 - 28:03
    transform in the same partition. And it's
    going to
  • 28:03 - 28:05
    go to the state as well on the same
  • 28:05 - 28:07
    partition.
  • 28:07 - 28:10
    Here's another bottom left, like, on the left
    side,
  • 28:10 - 28:12
    another RailsConf tag. It's going to, it's
    in a
  • 28:12 - 28:14
    completely different partition. So possibly
    a different server. It's
  • 28:14 - 28:17
    going to go the aggregate in a completely
    different
  • 28:17 - 28:21
    server, and ideally what we'd want is for,
    at
  • 28:21 - 28:23
    the very end, we can do the moving average
  • 28:23 - 28:25
    calculation on the same server. So ideally
    it would
  • 28:25 - 28:29
    end up on the same state partition.
  • 28:29 - 28:31
    But if we have another, like this is just
  • 28:31 - 28:34
    hash tag sleep, something I would like to
    do,
  • 28:34 - 28:36
    it would stay on its own partition and it
  • 28:36 - 28:38
    would not, like, it doesn't need to be on
  • 28:38 - 28:42
    the, run on the same server that processes
    the
  • 28:42 - 28:44
    RailsConf hash tag.
  • 28:44 - 28:46
    So in, but the one thing's like that, one
  • 28:46 - 28:49
    move from the aggregate middle partition up
    to a
  • 28:49 - 28:53
    combining into one end point state partition,
    we haven't
  • 28:53 - 28:55
    defined yet.
  • 28:55 - 28:57
    The way you do that is, at any point,
  • 28:57 - 28:59
    you can be, oh, at this point, I want
  • 28:59 - 29:02
    to partition my tuples by hash tag. And when
  • 29:02 - 29:04
    we say that, what we're saying is we're specifying
  • 29:04 - 29:06
    a field on the tuple, and we're saying, hey
  • 29:06 - 29:09
    storm, we would like, once we cross this point,
  • 29:09 - 29:11
    we would like any tuple that has the same
  • 29:11 - 29:14
    hash tag value to end up in the same
  • 29:14 - 29:17
    partition. So this, at this point, like, up
    to
  • 29:17 - 29:19
    now, storm didn't, could actually run the
    entire topology
  • 29:19 - 29:21
    and same partition on one server.
  • 29:21 - 29:25
    There was no need to, like, it might, I
  • 29:25 - 29:26
    mean you could specify, make it do that, but
  • 29:26 - 29:29
    there is actually no need to process, to have
  • 29:29 - 29:31
    a Tweet hit the network, basically. To go
    to
  • 29:31 - 29:33
    a different node. The moment you add a partition,
  • 29:33 - 29:36
    like statement, you're saying, OK, yes, at
    this point,
  • 29:36 - 29:37
    we're going to have to make sure that all
  • 29:37 - 29:39
    hash tags on the same value end up on
  • 29:39 - 29:42
    the same server. So it's going to, at this
  • 29:42 - 29:44
    point, try and, like, make sure to do that,
  • 29:44 - 29:48
    and thus may hit the network for it.
  • 29:48 - 29:50
    So how many partitions should we run? Well,
    this
  • 29:50 - 29:53
    is very, very use-case specific. You could
    run eight.
  • 29:53 - 29:58
    You could run five-hundred twelve. You, it,
    it actually
  • 29:58 - 30:01
    depends on your use case. But in order to
  • 30:01 - 30:03
    answer this better, we need to talk a bit
  • 30:03 - 30:10
    about how Storm schedules partitions on the
    cluster. So
  • 30:10 - 30:12
    first of all, there are many servers. And
    you
  • 30:12 - 30:15
    have a cluster, you have many servers distributed,
    yay.
  • 30:15 - 30:18
    So let's say, I don't know, three servers,
    and
  • 30:18 - 30:22
    each server could have many threads per server,
    because
  • 30:22 - 30:25
    it's very, very concurrent.
  • 30:25 - 30:28
    So you might have, I don't know, thirty, forty,
  • 30:28 - 30:31
    you, configurable amount of servers, threads
    per the server.
  • 30:31 - 30:34
    And there can be many partitions per thread,
    as
  • 30:34 - 30:37
    well. So every, a thread in Storm is considered
  • 30:37 - 30:41
    an executer. And an executer can have an arbitrary
  • 30:41 - 30:43
    number of partitions. So when you start up
    your
  • 30:43 - 30:45
    cluster, like let's say you've got, you want
    to
  • 30:45 - 30:48
    boot up a Storm cluster that is three nodes,
  • 30:48 - 30:51
    you have nine partitions, three threads. So
    it works
  • 30:51 - 30:53
    out pretty nice. This is how it might, like,
  • 30:53 - 30:54
    start initially.
  • 30:54 - 30:59
    So you boot up and you have, like, anyway.
  • 30:59 - 31:02
    So it's going to assign three partitions per
    thread,
  • 31:02 - 31:06
    per server. So that, the nice thing of dealing
  • 31:06 - 31:10
    with partitions, dealing with partitions is
    actually a, something
  • 31:10 - 31:14
    that basically everything that ends up being
    pretty distributed
  • 31:14 - 31:17
    does. Like, Reaq, Cassandra, Kafka, all these
    things go
  • 31:17 - 31:20
    via partitions, and, because it's very nice.
    But little
  • 31:20 - 31:21
    tangent.
  • 31:21 - 31:24
    So, what happens is, when you lose a server,
  • 31:24 - 31:27
    oh. We don't have these executers anymore.
    We have
  • 31:27 - 31:31
    all these partitions, they need to be handled
    still.
  • 31:31 - 31:34
    Storm is going to redistribute them across
    existing servers.
  • 31:34 - 31:35
    So it's gonna be like, OK, all these, all
  • 31:35 - 31:39
    these partitions which represent work, work
    load, are gonna
  • 31:39 - 31:42
    be moved to the other servers.
  • 31:42 - 31:46
    And, inversely, that's a little, on the top
    is
  • 31:46 - 31:47
    a little server box, and I'm saying, ah, we're
  • 31:47 - 31:50
    adding a server to our cluster. And then we're
  • 31:50 - 31:53
    gonna rebalance it, and Storm is going to
    take
  • 31:53 - 31:57
    the existing partitions and redistribute,
    redistribute it across the
  • 31:57 - 31:59
    available servers.
  • 31:59 - 32:03
    Probably more evenly than my little graph
    says, talks
  • 32:03 - 32:03
    ab- shows. OK.
  • 32:03 - 32:10
    It's
    time for some real talk. Failure.
  • 32:13 - 32:18
    So, the question is not will there be failure.
  • 32:18 - 32:22
    The question is, how do we handle it? Cause,
  • 32:22 - 32:24
    if you're building a distributed system, at
    some point
  • 32:24 - 32:25
    there is going to be fail, there is going
  • 32:25 - 32:30
    to be something that fails. And you can either
  • 32:30 - 32:32
    close your eyes and pretend it doesn't happen
    or
  • 32:32 - 32:34
    you can think through the problems, like how
    will
  • 32:34 - 32:36
    we recover? Is the system going to be in
  • 32:36 - 32:39
    an inconsistent state afterwards? Is, are
    we gonna lose
  • 32:39 - 32:42
    availability during this time?
  • 32:42 - 32:45
    Handling failure properly is probably the
    hardest part of
  • 32:45 - 32:50
    building any distributed system. But it's
    not like you
  • 32:50 - 32:52
    have to use really fancy technologies to build
    a
  • 32:52 - 32:55
    distributed system. Like if you build a plain-old
    Rails
  • 32:55 - 32:57
    app to build a distributed system, you have
    a,
  • 32:57 - 32:59
    the browser talks to the Rails app. And all
  • 32:59 - 33:01
    these problems that exist, maybe we don't
    think about
  • 33:01 - 33:04
    it as much because we're OK with higher error
  • 33:04 - 33:07
    rates and inconsistencies. But, let's think,
    like, even a
  • 33:07 - 33:11
    simple case like a signup form, like the user
  • 33:11 - 33:16
    fills up the signup form and hits submit and
  • 33:16 - 33:19
    it errors out, what happened, right? Did the,
    did
  • 33:19 - 33:21
    it fail reaching our server? Did it reach
    our
  • 33:21 - 33:25
    server and we started processing it and, but
    we
  • 33:25 - 33:29
    had an exception? Did we, like, actually create
    the
  • 33:29 - 33:31
    account in the database and the next time
    the
  • 33:31 - 33:33
    user is going to refresh the page it'll add,
  • 33:33 - 33:34
    like, I'd like, I'd like to really signup.
    I'm
  • 33:34 - 33:36
    gonna try to signup again.
  • 33:36 - 33:39
    And it's gonna be, oh no, you can't signup.
  • 33:39 - 33:41
    Your email is taken. Well, OK. How do we
  • 33:41 - 33:44
    handle all this? And there are ways. I'm just
  • 33:44 - 33:47
    kind of bringing up this simple case, which
    I'm
  • 33:47 - 33:51
    sure everybody here has properly protected
    their Rails app
  • 33:51 - 33:54
    against, because nobody in this room has ever
    had
  • 33:54 - 33:56
    this bug happen.
  • 33:56 - 33:59
    But, I bring it up because this is a
  • 33:59 - 34:02
    relatively simple case, and it really just
    gets more
  • 34:02 - 34:05
    complicated from here. So, for example, in
    Storm, the
  • 34:05 - 34:08
    whole, like, it's fine. The whole point is
    that,
  • 34:08 - 34:10
    you may have one input tuple. Like, one message
  • 34:10 - 34:13
    might be popped up from the queue, and as
  • 34:13 - 34:15
    you process it, you're going to end up generating
  • 34:15 - 34:18
    more tuples. Like, for example, we are going
    to,
  • 34:18 - 34:20
    from one Tweet, we are going to extract many
  • 34:20 - 34:24
    hash tag messages. And if you, as you build
  • 34:24 - 34:26
    more and more complex topologies, this is
    just going
  • 34:26 - 34:30
    to like, this can, like, fan out.
  • 34:30 - 34:34
    So what happens if, like, this one dies? So
  • 34:34 - 34:38
    everything else, like, succeeded, but that
    one, and all
  • 34:38 - 34:43
    of its children, didn't. How do we, then,
    what
  • 34:43 - 34:47
    do we do? Do we try to retry just
  • 34:47 - 34:49
    that one tuple? If we do that, we then
  • 34:49 - 34:53
    have to, well, we have to make, track, that
  • 34:53 - 34:55
    is that specific one that failed, that's gonna
    involve
  • 34:55 - 35:00
    a lot of bookkeeping. The, and what happens
    if
  • 35:00 - 35:02
    tuples have multiple parents? And one parent
    ended up
  • 35:02 - 35:04
    succeeding but not the other?
  • 35:04 - 35:07
    And that's possible, with Storm. You can have
    tuples
  • 35:07 - 35:10
    that have multiple parents because you can
    have joins.
  • 35:10 - 35:12
    But how does one process it? Do you retry
  • 35:12 - 35:18
    the entire batch? It's hard.
  • 35:18 - 35:20
    I will get to how you handle it. But,
  • 35:20 - 35:23
    and the details of how Storm does it internally
  • 35:23 - 35:26
    are really interesting, and I would recommend
    you go
  • 35:26 - 35:28
    and read the Wiki page on it, but I
  • 35:28 - 35:31
    don't have time to talk about it. So, going
  • 35:31 - 35:34
    back to the original Sidekiq thing, what happens
    if
  • 35:34 - 35:37
    we're, like, halfway through the iteration
    and we've persisted
  • 35:37 - 35:40
    some of them and, we, it just explodes halfway
  • 35:40 - 35:43
    through? Well, do we retry the message? Do
    we,
  • 35:43 - 35:47
    I don't know. What do? What now?
  • 35:47 - 35:50
    Generally, we have to think about, well, when
    we're
  • 35:50 - 35:52
    using a system like Sidekiq or Storm, what
    are
  • 35:52 - 35:55
    the message processing guarantees? Because
    it's going to change
  • 35:55 - 35:59
    how you work with it. Generally, the two,
    the
  • 35:59 - 36:03
    two main ones are, are messages processed
    at least
  • 36:03 - 36:05
    once, or are messages processed at most once?
    And
  • 36:05 - 36:07
    you can only get one, not the other. Like
  • 36:07 - 36:10
    you can, I mean, yes, you can get exactly
  • 36:10 - 36:11
    one, see what I mean. If anyone says, ah,
  • 36:11 - 36:13
    we guarantee that a message is going to be
  • 36:13 - 36:16
    processed exactly once, they're lying.
  • 36:16 - 36:18
    It's impossible.
  • 36:18 - 36:20
    So to que, what we can try to do
  • 36:20 - 36:23
    is, well, Storm is at least once. But what
  • 36:23 - 36:25
    we can try to do is handle the case,
  • 36:25 - 36:28
    is like, can we handle, if we get messages
  • 36:28 - 36:30
    that show up multiple times, can we handle
    that?
  • 36:30 - 36:33
    Can we maybe not redo the processing? Can
    we,
  • 36:33 - 36:36
    like, can we try to ensure that we end
  • 36:36 - 36:39
    up with what, as close to as possible, as
  • 36:39 - 36:42
    close as possible to processing exactly once?
  • 36:42 - 36:46
    And the way Storm tries to approach this is
  • 36:46 - 36:49
    really pretty nice. Like I said, we have batches.
  • 36:49 - 36:51
    Everything's processed in batches. And what
    Storm will guarantee
  • 36:51 - 36:54
    is that a batch, like the next batch will
  • 36:54 - 36:58
    not complete, like, will not go, reach completion
    until
  • 36:58 - 37:00
    the previous one is fully committed. And it
    will
  • 37:00 - 37:04
    also give monotonically increasing batch ids.
  • 37:04 - 37:05
    So it will guarantee that the next batch id
  • 37:05 - 37:08
    will have a bigger id than the previous one.
  • 37:08 - 37:12
    So we can combine this, and I will just
  • 37:12 - 37:14
    Tweet my slides and it will be easier to
  • 37:14 - 37:17
    read, but the main this is, I'm now introducing,
  • 37:17 - 37:19
    in the beginning, commit and running out of
    time
  • 37:19 - 37:20
    so I'm gonna try to get through this.
  • 37:20 - 37:23
    But the main thing, if we, in the beginning
  • 37:23 - 37:24
    commit, we get the transaction id, what we
    can
  • 37:24 - 37:28
    do is store it off, and what, and load
  • 37:28 - 37:30
    up the state and we will talk about that
  • 37:30 - 37:32
    later. But the important thing I want to reach
  • 37:32 - 37:35
    is, talk about it is here, in the update
  • 37:35 - 37:37
    function, we don't actually write to the database
    anymore.
  • 37:37 - 37:39
    We just store that in memory. And the main
  • 37:39 - 37:43
    line is return if hash tag last transaction
    id
  • 37:43 - 37:45
    equals transaction id. What that means is
    we are
  • 37:45 - 37:47
    able to then see, OK, since we know the
  • 37:47 - 37:50
    guarantees, if for some reason a message failed
    and
  • 37:50 - 37:53
    we reprocess the batch, we, and we know, we
  • 37:53 - 37:55
    can know that this hash tag actually passed
    a
  • 37:55 - 37:58
    certain point, like passed that point, so
    we don't
  • 37:58 - 38:01
    need to recompute the moving average on it.
  • 38:01 - 38:05
    And get as close as possible to exactly once.
  • 38:05 - 38:09
    So then, we store all this memory, and once
  • 38:09 - 38:11
    the commit happens, which happens at the end
    of
  • 38:11 - 38:12
    the batch, we write all of this, we do
  • 38:12 - 38:15
    basically, we do all the work that hits the
  • 38:15 - 38:17
    outside world, which is right to the database.
    We
  • 38:17 - 38:22
    do all that there. And if we reach completion,
  • 38:22 - 38:25
    the batch will complete successfully, and
    we know, OK,
  • 38:25 - 38:29
    all the messages have been successfully processed.
    But if
  • 38:29 - 38:31
    it fails during the persist, half of the them
  • 38:31 - 38:34
    will have the new transaction id and half
    of
  • 38:34 - 38:35
    them will have the old transaction id, but
    when
  • 38:35 - 38:37
    the batch gets replayed and we reload that,
    we
  • 38:37 - 38:41
    will see, ah, this message already finished
    this batch.
  • 38:41 - 38:43
    So to roll through this real quick, I know
  • 38:43 - 38:45
    it's, I believe I'm getting really close or
    past
  • 38:45 - 38:48
    my time. But, so, yeah, let's get data. What
  • 38:48 - 38:52
    happens? Like, I fetch two hundred tuples.
    Boom. We
  • 38:52 - 38:54
    go through. Aggregate. And it explodes.
  • 38:54 - 38:59
    Eventually, the Storm coordinator's gonna
    be like, up above,
  • 38:59 - 39:02
    it says, guys, I did not get a completion
  • 39:02 - 39:05
    message. All right, fine. Spout, let, we're
    going to
  • 39:05 - 39:09
    restart batch 123, because I did not get completion.
  • 39:09 - 39:11
    So the Tweet spout's gonna be like, OK, I
  • 39:11 - 39:14
    got this. I'm gonna re-emit exactly the same
    two
  • 39:14 - 39:16
    hundred tuples in the same order that I did
  • 39:16 - 39:17
    before.
  • 39:17 - 39:20
    So what that means is if we successfully extract,
  • 39:20 - 39:23
    like, if we successfully implemented our transforms
    to be
  • 39:23 - 39:28
    purely functional, as in completely deterministic
    based off the
  • 39:28 - 39:30
    input, as we go through, we're gonna get exactly
  • 39:30 - 39:33
    the same outputs. Such that, once we get to
  • 39:33 - 39:36
    the state, I'd, in theory everything's going
    to be
  • 39:36 - 39:39
    exactly the same, such that when we actually
    run
  • 39:39 - 39:42
    the persist, we know that if something we
    successfully
  • 39:42 - 39:44
    persisted, and we get the same transaction
    id, we're
  • 39:44 - 39:45
    gonna be like, well, it's going to be exactly
  • 39:45 - 39:47
    the same output.
  • 39:47 - 39:52
    All right. So the main catch is, of course,
  • 39:52 - 39:55
    transforms must be purely functional, cause
    otherwise this won't
  • 39:55 - 39:59
    work. That means not ever using time dot now
  • 39:59 - 40:03
    within your transforms. I actually changed
    it on the
  • 40:03 - 40:05
    last slide, but I had to, like, zoom through
  • 40:05 - 40:07
    it. The main thing is, again, one, if you
  • 40:07 - 40:09
    have need to get the current time for a
  • 40:09 - 40:11
    batch, one way to do it is to have
  • 40:11 - 40:13
    a spout that only emits the current time.
    And
  • 40:13 - 40:15
    every, if the batch gets re-emitted, it just
    knows,
  • 40:15 - 40:18
    it saves off somewhere. For this batch id,
    I
  • 40:18 - 40:20
    outputted this time.
  • 40:20 - 40:22
    And the next thing is that, it's going to
  • 40:22 - 40:27
    require the spouts to re-emit exactly identical
    batches. Which,
  • 40:27 - 40:33
    unfortunately, I think most spouts do not.
    I mean,
  • 40:33 - 40:36
    most, like Redis, does not support. So in
    order
  • 40:36 - 40:40
    to get this level of guarantees, the only
    queue
  • 40:40 - 40:43
    is Kafka. And I was, would like to talk
  • 40:43 - 40:46
    more about Kafka and why it's so awesome,
    but
  • 40:46 - 40:48
    we can talk about that after. In general,
    even
  • 40:48 - 40:50
    if you're not using Storm, Kafka is a really
  • 40:50 - 40:53
    amazing tool that I highly recommend.
  • 40:53 - 40:57
    So, TL;DR. Storm is a really powerful platform
    for
  • 40:57 - 41:01
    writing workers. It's really great for stateful
    jobs, and
  • 41:01 - 41:04
    by stateful jobs, I mean jobs that depend
    on
  • 41:04 - 41:08
    the result of previous ones. It's good for
    complex
  • 41:08 - 41:12
    data processing flows. And that's it. I think.
    I
  • 41:12 - 41:14
    don't. My time wasn't up here, so I'm guessing
  • 41:14 - 41:15
    this is about right.
  • 41:15 - 41:20
    Am I really early or late? OK. Cool.
  • 41:20 - 41:22
    I didn't have a clock on either so I'm
  • 41:22 - 41:23
    like, guessing. It didn't start, I don't know.
    All
  • 41:23 - 41:26
    right. Well, I'm done. Thanks. All right.
Title:
RailsConf 2014 - Supercharge Your Workers with Storm by Carl Lerche
Description:

more » « less
Duration:
41:50

English subtitles

Revisions