twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

twitter storm婧愮爜璧拌涔?--topology鎻愪氦杩囩▼鍒嗘瀽

姒傝

storm cluster鍙互鎯冲儚鎴愪负涓€涓伐鍘傦紝nimbus涓昏璐熻矗浠庡閮ㄦ帴鏀惰鍗曞拰浠诲姟鍒嗛厤銆傞櫎浜嗕粠澶栭儴鎺ュ崟锛宯imbus杩樿灏嗚繖浜涘閮ㄨ鍗曡浆鎹㈡垚涓哄唴閮ㄥ伐浣滃垎閰嶏紝杩欎釜鏃跺€檔imbus鍏呭綋浜嗚皟搴﹀鐨勮鑹层€俿upervisor浣滀负涓眰骞查儴锛岃亴璐e氨鏄敓浜ц溅闂寸殑涓讳换锛屼粬鐨勬棩甯稿伐浣滃氨鏄椂鍒荤瓑寰呯潃璋冨害鍒扮粰浠栦笅杈炬柊鐨勫伐浣溿€備綔涓鸿溅闂翠富浠伙紝supervisor棰嗗埌鐨勬椿鏄笉鐢ㄨ嚜宸变翰鍔涗翰涓哄幓浣滅殑锛屼粬鎵嬩笅鏈夌潃涓€鐝殑鏅€氬伐浜恒€俿upervisor瀵硅繖浜涘伐浜哄彧浼氬枈涓ゅ彞璇濓紝寮€宸ワ紝鏀跺伐銆傛敞鎰忥紝璁叉敹宸ョ殑鏃跺€欏苟涓嶆剰鍛崇潃worker鎵嬩笂鐨勬椿宸茬粡骞插畬浜嗭紝鍙槸杩涘叆浼戞伅鐘舵€佽€屽凡銆?/p>

topology鐨勬彁浜よ繃绋嬫秹鍙婂埌浠ヤ笅瑙掕壊銆?/p>

  • storm client銆€聽璐熻矗灏嗙敤鎴峰垱寤虹殑topology鎻愪氦鍒皀imbus
  • nimbus銆€銆€聽聽聽聽聽閫氳繃thrift鎺ュ彛鎺ユ敹鐢ㄦ埛鎻愪氦鐨則opology
  • supervisor聽聽聽聽聽聽聽鏍规嵁zk鎺ュ彛涓婃彁绀虹殑娑堟伅涓嬭浇鏈€鏂扮殑浠诲姟瀹夋帓锛屽苟璐熻矗鍚姩worker
  • worker聽聽聽聽聽聽聽聽聽聽聽聽worker鍐呭彲浠ヨ繍琛宼ask,杩欎簺task瑕佷箞灞炰簬bolt绫诲瀷锛岃涔堝睘浜巗pout绫诲瀷
  • executor聽聽聽聽聽聽聽聽聽executor鏄竴涓釜杩愯鐨勭嚎绋嬶紝鍚屼竴涓猠xecutor鍐呭彲浠ヨ繍琛屽悓涓€绉嶇被鍨嬬殑task,鍗充竴涓嚎绋嬩腑鐨則ask瑕佷箞鍏ㄩ儴鏄痓olt绫诲瀷锛岃涔堝叏閮ㄦ槸spout绫诲瀷

涓€涓獁orker绛夊悓浜庝竴涓繘绋嬶紝涓€涓猠xecutor绛夊悓浜庝竴涓嚎绋嬶紝鍚屼竴涓嚎绋嬩腑鑳藉杩愯涓€鎴栧涓猼asks銆傚湪0.8.0鐗堜箣鍓嶏紝涓€涓猼ask鏄搴斾簬涓€涓嚎绋嬬殑锛屽湪0.8.0鐗堟湰涓紩鍏ヤ簡executor姒傚康锛屽彉鍖栧紩鍏ヤ箣鍚庯紝task涓巘hread涔嬮棿鐨勪竴涓€瀵瑰簲鍏崇郴灏卞彇娑堜簡锛屽悓鏃跺湪zookeeper server涓師鏈瓨鍦ㄧ殑tasks-subtree涔熸秷澶变簡锛屾湁鍏宠繖涓彉鍖栵紝鍙互鍙傝€?a href="http://storm-project.net/2012/08/02/storm080-released.html" title="Storm 0.8.0 and Trident released" style="margin: 0px; color: #0066cc; text-decoration: underline; padding: 0px;">http://storm-project.net/2012/08/02/storm080-released.html

聽storm client

storm client闇€瑕佹墽琛屼笅闈㈣繖鍙ユ寚浠ゅ皢瑕佹彁浜ょ殑topology鎻愪氦缁檚torm cluster 鍋囪jar鏂囦欢鍚嶄负storm-starter-0.0.1-snapshot-standalone.jar,鍚姩绋嬪簭涓?storm.starter.ExclamationTopology,缁欒繖涓猼opology璧风殑鍚嶇О涓篹xclamationTopology.

#./storm jar $HOME/working/storm-starter/target/storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.ExclamationTopology exclamationTopology

杩欎箞鐭煭鐨勪竴鍙ヨ瘽瀵逛簬storm client鏉ヨ锛岀┒绔熸剰鍛崇潃浠€涔堝憿锛?婧愮爜闈㈠墠鏄病鏈変换浣曠瀵嗗彲瑷€鐨勶紝閭eソ鎵撳紑storm client鐨勬簮鐮佹枃浠?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
def jar(jarfile, klass, *args):
    """Syntax: [storm jar topology-jar-path class ...]

    Runs the main method of class with the specified arguments. 
    The storm jars and configs in ~/.storm are put on the classpath. 
    The process is configured so that StormSubmitter 
    (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
    will upload the jar at topology-jar-path when the topology is submitted.
    """
    exec_storm_class(
        klass,
        jvmtype="-client",
        extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
        args=args,
        jvmopts=["-Dstorm.jar=" + jarfile])
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], 
              extrajars=[], args=[], fork=False):
    global CONFFILE
    all_args = [
        "java", jvmtype, get_config_opts(),
        "-Dstorm.home=" + STORM_DIR, 
        "-Djava.library.path=" + confvalue("java.library.path", extrajars),
        "-Dstorm.conf.file=" + CONFFILE,
        "-cp", get_classpath(extrajars),
    ] + jvmopts + [klass] + list(args)
    print "Running: " + " ".join(all_args)
    if fork:
        os.spawnvp(os.P_WAIT, "java", all_args)
    else:
        os.execvp("java", all_args) # replaces the current process and
        never returns
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

exec_storm_class璇寸櫧浜嗗氨鏄杩愯浼犺繘鏉ヤ簡鐨刉ordCountTopology绫讳腑main鍑芥暟锛屽啀鐪嬬湅main鍑芥暟鐨勫疄鐜?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
}
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

瀵逛簬storm client渚ф潵璇达紝鏈€涓昏鐨勫嚱鏁癝tormSubmitter闇插嚭浜嗙湡闈㈢洰锛宻ubmitTopology鎵嶆槸鎴戜滑鐪熸瑕佺爺绌剁殑閲嶇偣銆?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
public static void submitTopology(String name, Map stormConf,
 StormTopology topology, SubmitOptions opts) 
throws AlreadyAliveException, InvalidTopologyException 
{
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        try {
            String serConf = JSONValue.toJSONString(stormConf);
            if(localNimbus!=null) {
                LOG.info("Submitting topology " + name + " in local mode");
                localNimbus.submitTopology(name, null, serConf, topology);
            } else {
                NimbusClient client = NimbusClient.getConfiguredClient(conf);
                if(topologyNameExists(conf, name)) {
                    throw new RuntimeException("Topology with name `"
                    + name 
                    + "` already exists on cluster");
                }
                submitJar(conf);
                try {
                    LOG.info("Submitting topology " +  name 
                    + " in distributed mode with conf " + serConf);
                    if(opts!=null) {
                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);                    
                    } else {
                        // this is for backwards compatibility
                        client.getClient().submitTopology(name, submittedJar, serConf, topology);                                            
                    }
                } catch(InvalidTopologyException e) {
                    LOG.warn("Topology submission exception", e);
                    throw e;
                } catch(AlreadyAliveException e) {
                    LOG.warn("Topology already alive exception", e);
                    throw e;
                } finally {
                    client.close();
                }
            }
            LOG.info("Finished submitting topology: " +  name);
        } catch(TException e) {
            throw new RuntimeException(e);
        }
    }
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

submitTopology鍑芥暟鍏跺疄涓昏灏卞共涓や欢浜嬶紝涓€涓婁紶jar鏂囦欢鍒皊torm cluster锛屽彟涓€浠朵簨閫氱煡storm cluster鏂囦欢宸茬粡涓婁紶瀹屾瘯锛屼綘鍙互鎵ц鏌愭煇鏌恡opology浜?

鍏堢湅涓婁紶jar鏂囦欢瀵瑰簲鐨勫嚱鏁皊ubmitJar,鍏惰皟鐢ㄥ叧绯诲涓嬪浘鎵€绀?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

鍐嶇湅绗簩姝ヤ腑鐨勮皟鐢ㄥ叧绯伙紝鍥炬槸鎴戠敤tikz/pgf鍐欑殑锛岀敓鎴愮殑鏄痯df鏍煎紡銆?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

鍦ㄤ笂杩颁袱骞呰皟鐢ㄥ叧绯诲浘涓紝澶勪簬瀛愭爲浣嶇疆鐨勫嚱鏁伴兘鏇惧湪storm.thrift涓0鏄庯紝濡傛灉姝ゅ埢宸茬粡蹇樿浜嗙殑鐐硅瘽锛屽彲浠ョ炕鐪嬩竴涓嬪墠闈?.3鑺備腑鏈夊叧storm.thrift鐨勬弿杩般€俢lient渚х殑杩欎簺鍑芥暟閮芥槸鐢眛hrift鑷姩鐢熸垚鐨勩€?/p>

鐢变簬绡囧箙鍜屾椂闂寸殑鍏崇郴锛屽湪storm client渚ubmit topology鐨勬椂鍊欙紝闈炲父閲嶈鐨勫嚱鏁拌繕鏈塗opologyBuilder.java涓殑婧愮爜銆?/p>

nimbus

storm client渚ч€氳繃thrift鎺ュ彛鍚憂imbus鍙戦€佷簡浜唈ar骞朵笖閫氳繃棰勫厛瀹氫箟濂界殑submitTopologyWithOpts鏉ュ鐞嗕笂浼犵殑topology锛岄偅涔坣imbus鏄浣曚竴姝ユ鐨勮繘琛屾枃浠舵帴鏀跺苟灏嗗叾浠诲姟缁嗗寲鏈€缁堜笅杈剧粰supervisor鐨勫憿銆?/p>

submitTopologyWithOpts

涓€鍒囪繕鏄浠巘hrift璇磋捣锛宻upervisor.clj涓殑service-handler鍏蜂綋瀹炵幇浜唗hrift瀹氫箟鐨凬imbus鎺ュ彛锛屼唬鐮佽繖閲屽氨涓嶇綏鍒椾簡锛屽お鍗犵瘒骞呫€備富瑕佺湅鍏舵槸濡備綍瀹炵幇submitTopologyWithOpts

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
(^void submitTopologyWithOpts
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
         ^SubmitOptions submitOptions]
        (try
          (assert (not-nil? submitOptions))
          (validate-topology-name! storm-name)
          (check-storm-active! nimbus storm-name false)
          (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
                     storm-name
                     (from-json serializedConf)
                     topology)
          (swap! (:submitted-count nimbus) inc)
          (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
                storm-conf (normalize-conf
                            conf
                            (-> serializedConf
                                from-json
                                (assoc STORM-ID storm-id)
                              (assoc TOPOLOGY-NAME storm-name))
                            topology)
                total-storm-conf (merge conf storm-conf)
                topology (normalize-topology total-storm-conf topology)
                topology (if (total-storm-conf TOPOLOGY-OPTIMIZE)
                           (optimize-topology topology)
                           topology)
                storm-cluster-state (:storm-cluster-state nimbus)]
            (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
            (log-message "Received topology submission for " storm-name " with conf " storm-conf)
            ;; lock protects against multiple topologies being submitted at once and
            ;; cleanup thread killing topology in b/w assignment and starting the topology
            (locking (:submit-lock nimbus)
              (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
              (.setup-heartbeats! storm-cluster-state storm-id)
              (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
                                              TopologyInitialStatus/ACTIVE :active}]
                (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))
              (mk-assignments nimbus)))
          (catch Throwable e
            (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
            (throw e))))
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

storm cluster鍦▃ookeeper server涓婂垱寤虹殑鐩綍缁撴瀯銆傜洰褰曠粨鏋勭浉鍏崇殑婧愭枃浠舵槸config.clj.

鐧借瘽涓€涓嬩笂闈㈣繖涓嚱鏁扮殑鎵ц閫昏緫锛屽涓婁紶鐨則opology浣滃繀瑕佺殑妫€娴嬶紝鍖呮嫭鍚嶅瓧锛屾枃浠跺唴瀹瑰強鏍煎紡锛屽ソ姣斾綘杩涗竴瀹跺叕鍙镐笂鐝箣鍓嶅仛鐨勪綋妫€銆傝繖浜涘伐浣滈兘瀹屾垚涔嬪悗杩涘叆鍏抽敭鍖哄煙锛屾槸杩涘叆鍏抽敭鍖哄煙鎵€浠ヤ笂閿侊紝鍛靛懙銆?/p>

normalize-topology

(defn all-components [^StormTopology topology]
  (apply merge {}
         (for [f thrift/STORM-TOPOLOGY-FIELDS]
           (.getFieldValue topology f)
           )))

涓€鏃﹀垪鍑烘墍鏈夌殑components,灏卞彲浠ヨ鍑鸿繖浜沜omponent鐨勯厤缃俊鎭€?/p>

mk-assignments

鍦ㄨ繖鍏抽敭鍖哄煙鍐呮墽琛岀殑閲嶇偣灏辨槸鍑芥暟mk-assignments锛宮k-assignment鏈変袱涓富瑕佷换鍔★紝绗竴鏄绠楀嚭鏈夊灏憈ask,鍗虫湁澶氬皯涓猻pout,澶氬皯涓猙olt锛岀浜屽氨鏄湪鍒氭墠鐨勮绠楀熀纭€涓婇€氳繃璋冪敤zookeeper搴旂敤鎺ュ彛锛屽啓鍏ssignment锛屼互渚縮upervisor鎰熺煡鍒版湁鏂扮殑浠诲姟闇€瑕佽棰嗐€?/p>

鍏堣绗簩鐐癸紝鍥犱负閫昏緫绠€鍗曘€傚湪mk-assignment涓墽琛屽涓嬩唬鐮佸湪zookeeper涓瀹氱浉搴旂殑鏁版嵁浠ヤ究supervisor鑳藉鎰熺煡鍒版湁鏂扮殑浠诲姟浜х敓

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
(doseq [[topology-id assignment] new-assignments
            :let [existing-assignment (get existing-assignments topology-id)
                  topology-details (.getById topologies topology-id)]]
      (if (= existing-assignment assignment)
        (log-debug "Assignment for " topology-id " hasn't changed")
        (do
          (log-message "Setting new assignment for topology id " topology-id ": " 
                  (pr-str assignment))
          (.set-assignment! storm-cluster-state topology-id assignment)
          )))
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

璋冪敤鍏崇郴濡備笅鍥炬墍绀?br style="margin: 0px; padding: 0px;">twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

鑰岀涓€鐐规秹鍙婂埌鐨勮绠楃浉瀵圭箒鏉傦紝闇€瑕佷竴涓€浠旂粏閬撴潵銆傚叾瀹炵涓€鐐逛腑闈炲父閲嶈鐨勮棰樺氨鏄浣曡繘琛屼换鍔$殑鍒嗗彂锛屽嵆scheduling.
涔熻浣犲凡缁忔敞鎰忓埌鐩綍src/clj/backtype/storm/scheduler锛屾垨鑰呮敞鎰忓埌storm.yaml涓笌scheduler鐩稿叧鐨勯厤缃」銆傞偅涔堣繖涓猻cheduler鍒板簳鏄湪浠€涔堟椂鍊欒捣浣滅敤鐨勫憿銆俶k-assignments浼氶棿鎺ヨ皟鐢ㄥ埌杩欎箞涓€涓悕瀛楃湅璧锋潵濂囨€紓甯哥殑鍑芥暟銆俢ompute-new-topology->executor->node+por锛屼篃灏辨槸鍦ㄨ繖涔堝緢濂囨€殑鍑芥暟鍐咃紝scheduler琚皟鐢?/p>

_ (.schedule (:scheduler nimbus) topologies cluster)
new-scheduler-assignments (.getAssignments cluster)
;; add more information to convert SchedulerAssignment to Assignment
new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)]

schedule璁$畻鍑烘潵鐨刟ssignments淇濆瓨浜嶤luster.java涓紝杩欎篃鏄负浠€涔坣ew-scheduler-assignment瑕佷粠鍏朵腑璇诲彇鏁版嵁鐨勭紭鐢辨墍鍦ㄣ€傛湁浜哸ssignment锛屽氨鍙互璁$畻鍑虹浉搴旂殑node鍜宲ort锛屽叾瀹炲氨鏄繖涓换鍔″簲璇ヤ氦鐢卞摢涓猻upervisor涓婄殑worker鏉ユ墽琛屻€?/p>

聽storm鍦▃ookeeper server涓婂垱寤虹殑鐩綍缁撴瀯濡備笅鍥炬墍绀?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

鏈変簡杩欎釜鐩綍缁撴瀯锛岀幇鍦ㄨ瑙g瓟鐨勯棶棰樻槸鍦╰opology鍦ㄦ彁浜ょ殑鏃跺€欒鍐欏摢鍑犱釜鐩綍锛焌ssignments鐩綍涓嬩細鏂板垱寤轰竴涓柊鎻愪氦鐨則opology鐨勭洰褰曪紝鍦ㄨ繖涓猼opology涓渶瑕佸啓鐨勬暟鎹紝鍏舵暟鎹粨鏋勬槸浠€涔堟牱瀛愶紵

supervisor

涓€鏃︽湁鏂扮殑assignment琚啓鍏ュ埌zookeeper涓紝supervisor涓殑鍥炶皟鍑芥暟mk-synchronize-supervisor绔嬮┈琚敜閱掓墽琛?/p>

涓昏鎵ц閫昏緫灏辨槸璇诲叆zookeeper server涓柊鐨刟ssignments鍏ㄩ泦涓庡凡缁忚繍琛屼笌鏈満涓婄殑assignments浣滄瘮杈冿紝鍖哄埆鍑哄摢浜涙槸鏂板鐨勩€傚湪sync-processes鍑芥暟涓皢杩愯鍏蜂綋task鐨剋orker鎷夎捣銆?/p>

聽瑕佹兂璁叉竻妤歵opology鎻愪氦杩囩▼涓紝supervisor闇€瑕佸仛鍝簺鍔ㄤ綔锛屾渶涓昏鐨勬槸鍘荤悊瑙d笅闈袱涓嚱鏁扮殑澶勭悊閫昏緫銆?/p>

  • mk-synchronize-supervisor聽褰撳湪zookeeper server鐨刟ssignments瀛愮洰褰曞唴瀹规湁鎵€鍙樺寲鏃讹紝supervisor鏀跺埌鐩稿簲鐨刵otification, 澶勭悊杩欎釜notification鐨勫洖璋冨嚱鏁板嵆涓簃k-synchronize-supervisor锛宮k-sychronize-supervisor璇诲彇鎵€鏈夌殑assignments鍗充究瀹冧笉鏄敱鑷繁澶勭悊锛屽苟灏嗘墍鏈塧ssignment鐨勫叿浣撲俊鎭鍑恒€傚皵鍚庡垽鏂垎鏋愬嚭鍝簺assignment鏄垎閰嶇粰鑷繁澶勭悊鐨勶紝鍦ㄨ繖浜涘垎閰嶇殑assignment涓紝鍝簺鏄柊澧炵殑銆傜煡閬撲簡鏂板鐨刟ssignment涔嬪悗锛屼粠nimbus鐨勭浉搴旂洰褰曚笅杞絡ar鏂囦欢锛岀敤鎴疯嚜宸辩殑澶勭悊閫昏緫浠g爜骞舵病鏈変笂浼犲埌zookeeper server鑰屾槸鍦╪imbus鎵€鍦ㄧ殑鏈哄櫒纭洏涓娿€?/li>
  • sync-processesmk-synchronize-supervisor棰勫鐞嗚繃瀹屼笌assignment鐩稿叧鐨勬搷浣滃悗锛屽皢鐪熸鍚姩worker鐨勫姩浣滀氦缁檈vent-manager, event-manager杩愯鍦ㄥ彟涓€涓嫭绔嬬殑绾跨▼涓紝杩欎釜绾跨▼涓繘琛屽鐞嗙殑涓€涓富瑕佸嚱鏁板嵆sync-processes. sync-processes浼氬皢褰撳墠杩愯鐫€鐨剋orker鍏ㄩ儴kill,鐒跺悗鎸囧畾鏂扮殑杩愯鍙傛暟锛岄噸鏂版媺璧穡orker.
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
  (fn this []
    (let [conf (:conf supervisor)
          storm-cluster-state (:storm-cluster-state supervisor)
          ^ISupervisor isupervisor (:isupervisor supervisor)
          ^LocalState local-state (:local-state supervisor)
          sync-callback (fn [& ignored] (.add event-manager this))
          assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
          storm-code-map (read-storm-code-locations assignments-snapshot)
          downloaded-storm-ids (set (read-downloaded-storm-ids conf))
          ;;read assignments from zookeeper
          all-assignment (read-assignments
                           assignments-snapshot
                           (:assignment-id supervisor))
          new-assignment (->> all-assignment
                              (filter-key #(.confirmAssigned isupervisor %)))
          ;;task鍦╝ssignment涓?
          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
          existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
      (log-debug "Synchronizing supervisor")
      (log-debug "Storm code map: " storm-code-map)
      (log-debug "Downloaded storm ids: " downloaded-storm-ids)
      (log-debug "All assignment: " all-assignment)
      (log-debug "New assignment: " new-assignment)
      
      ;; download code first
      ;; This might take awhile
      ;;   - should this be done separately from usual monitoring?
      ;; should we only download when topology is assigned to this supervisor?
      (doseq [[storm-id master-code-dir] storm-code-map]
        (when (and (not (downloaded-storm-ids storm-id))
                   (assigned-storm-ids storm-id))
          (log-message "Downloading code for storm id "
             storm-id
             " from "
             master-code-dir)
          (download-storm-code conf storm-id master-code-dir)
          (log-message "Finished downloading code for storm id "
             storm-id
             " from "
             master-code-dir)
          ))

      (log-debug "Writing new assignment "
                 (pr-str new-assignment))
      (doseq [p (set/difference (set (keys existing-assignment))
                                (set (keys new-assignment)))]
        (.killedWorker isupervisor (int p)))
      (.assigned isupervisor (keys new-assignment))
      (.put local-state
            LS-LOCAL-ASSIGNMENTS
            new-assignment)
      (reset! (:curr-assignment supervisor) new-assignment)
      ;; remove any downloaded code that's no longer assigned or active
      ;; important that this happens after setting the local assignment so that
      ;; synchronize-supervisor doesn't try to launch workers for which the
      ;; resources don't exist
      (doseq [storm-id downloaded-storm-ids]
        (when-not (assigned-storm-ids storm-id)
          (log-message "Removing code for storm id "
                       storm-id)
          (rmr (supervisor-stormdist-root conf storm-id))
          ))
      (.add processes-event-manager sync-processes)
      )))
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

娉ㄦ剰鍔犱寒琛?br style="margin: 0px; padding: 0px;">assignments-snapshot鏄幓zookeeper server涓殑assignments瀛愮洰褰曡鍙栨墍鏈夌殑topology-ids鍙婂叾鍐呭锛屼細浣跨敤zk/get-children鍙妟k/get-data鍘熻銆傝皟鐢ㄥ叧绯诲涓?/p>

assignments-snapshot-->assignment-info-->clusterstate/get-data-->zk/get-data

浠g爜涓嬭浇(download-storm-code conf storm-id master-code-dir)锛宻torm client灏嗕唬鐮佷笂浼犲埌nimbus锛宯imbus灏嗗叾鏀惧埌鑷繁鎸囧畾鐨勭洰褰曪紝杩欎釜鐩綍缁撴瀯鍦╪imbus鎵€鍦ㄦ満鍣ㄧ殑鏂囦欢绯荤粺涓婂彲浠ユ壘鍒般€俿upervisor鐜板湪瑕佸仛鐨勪簨鎯呭氨鏄幓灏唍imbus涓婄殑浠g爜涓嬭浇澶嶅埗鍒版湰鍦般€?br style="margin: 0px; padding: 0px;">

聽(.add processes-event-manager sync-processes)娣诲姞浜嬩欢鍒癳vent-manager锛宔vent-manager鏄竴涓嫭绔嬭繍琛岀殑绾跨▼锛屾柊娣诲姞鐨勪簨浠跺鐞嗗嚱鏁颁负sync-processes, sync-processes鐨勪富瑕佸姛鑳藉湪鏈妭寮€濮嬪宸茬粡鎻忚堪銆?span style="margin: 0px; color: #000000; padding: 0px;">

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
(defn sync-processes [supervisor]
  (let [conf (:conf supervisor)
        ^LocalState local-state (:local-state supervisor)
        assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
        now (current-time-secs)
        allocated (read-allocated-workers supervisor assigned-executors now)
        keepers (filter-val
                 (fn [[state _]] (= state :valid))
                 allocated)
        keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
        reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
        new-worker-ids (into
                        {}
                        (for [port (keys reassign-executors)]
                          [port (uuid)]))
        ]
    ;; 1. to kill are those in allocated that are dead or disallowed
    ;; 2. kill the ones that should be dead
    ;;     - read pids, kill -9 and individually remove file
    ;;     - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
    ;; 3. of the rest, figure out what assignments aren't yet satisfied
    ;; 4. generate new worker ids, write new "approved workers" to LS
    ;; 5. create local dir for worker id
    ;; 5. launch new workers (give worker-id, port, and supervisor-id)
    ;; 6. wait for workers launch
  
    (log-debug "Syncing processes")
    (log-debug "Assigned executors: " assigned-executors)
    (log-debug "Allocated: " allocated)
    (doseq [[id [state heartbeat]] allocated]
      (when (not= :valid state)
        (log-message
         "Shutting down and clearing state for id " id
         ". Current supervisor time: " now
         ". State: " state
         ", Heartbeat: " (pr-str heartbeat))
        (shutdown-worker supervisor id)
        ))
    (doseq [id (vals new-worker-ids)]
      (local-mkdirs (worker-pids-root conf id)))
    (.put local-state LS-APPROVED-WORKERS
          (merge
           (select-keys (.get local-state LS-APPROVED-WORKERS)
                        (keys keepers))
           (zipmap (vals new-worker-ids) (keys new-worker-ids))
           ))
    (wait-for-workers-launch
     conf
     (dofor [[port assignment] reassign-executors]
       (let [id (new-worker-ids port)]
         (log-message "Launching worker with assignment "
                      (pr-str assignment)
                      " for this supervisor "
                      (:supervisor-id supervisor)
                      " on port "
                      port
                      " with id "
                      id
                      )
         (launch-worker supervisor
                        (:storm-id assignment)
                        port
                        id)
         id)))
    ))
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

worker

worker鏄supervisor閫氳繃鍑芥暟launch-worker甯﹁捣鏉ョ殑銆傚苟娌℃湁澶栭儴鐨勬寚浠ゆ樉绀虹殑鍚姩鎴栧仠姝orker,褰撶劧kill闄ゅ, :).

worker鐨勪富瑕佷换鍔℃湁

  • 聽鍙戦€佸績璺虫秷鎭?/li>
  • 聽鎺ユ敹澶栭儴tuple鐨勬秷鎭?/li>
  • 聽鍚戝鍙戦€乼uple娑堟伅

杩欎簺宸ヤ綔闆嗕腑鍦╩k-worker鎸囧畾澶勭悊鍙ユ焺銆傛簮鐮佸湪姝ゅ灏变笉涓€涓€鍒楀嚭浜嗐€?/p>

executor

executor鏄€氳繃worker鎵цmk-executor瀹屾垚鍒濆鍖栬繃绋嬨€?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
(defn mk-executor [worker executor-id]
 (let [executor-data (mk-executor-data worker executor-id)
   _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
   task-datas (->> executor-data
                   :task-ids
                   (map (fn [t] [t (task/mk-task executor-data t)]))
                   (into {})
                   (HashMap.))
   _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
   report-error-and-die (:report-error-and-die executor-data)
   component-id (:component-id executor-data)

   ;; starting the batch-transfer->worker ensures that anything publishing to that queue 
   ;; doesn't block (because it's a single threaded queue and the caching/consumer started
   ;; trick isn't thread-safe)
   system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
   handlers (with-error-reaction report-error-and-die
              (mk-threads executor-data task-datas))
   threads (concat handlers system-threads)]    
    (setup-ticks! worker executor-data)

    (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
    ;; TODO: add method here to get rendered stats... have worker call that when heartbeating
    (reify
      RunningExecutor
      (render-stats [this]
        (stats/render-stats! (:stats executor-data)))
      (get-executor-id [this]
        executor-id )
      Shutdownable
      (shutdown
        [this]
        (log-message "Shutting down executor " component-id ":" (pr-str executor-id))
        (disruptor/halt-with-interrupt! (:receive-queue executor-data))
        (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
        (doseq [t threads]
          (.interrupt t)
          (.join t))
        
        (doseq [user-context (map :user-context (vals task-datas))]
          (doseq [hook (.getHooks user-context)]
            (.cleanup hook)))
        (.disconnect (:storm-cluster-state executor-data))
        (when @(:open-or-prepare-was-called? executor-data)
          (doseq [obj (map :object (vals task-datas))]
            (close-component executor-data obj)))
        (log-message "Shut down executor " component-id ":" (pr-str executor-id)))
        )))
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

涓婅堪浠g爜涓璵k-threads鐢ㄦ潵涓簊pout鎴栬€卋olt鍒涘缓thread.

mk-threads浣跨敤鍒颁簡clojure鐨勫嚱鏁伴噸杞芥満鍒讹紝鍊熺敤涓€涓媕ava鎴朿++鐨勬湳璇惂銆傚湪clojure涓娇鐢╠efmulti鏉ュ0鏄庝竴涓噸鍚嶅嚱鏁般€?/p>

mk-threads鍑芥暟鏈夌偣闀胯€屼笖閫昏緫鍙樺緱鏇翠负澶嶆潅锛岃繕鏄厛浠庡ぇ浣撲笂鏈変釜姒傚康涓哄ソ锛屽啀鍘绘參鎱㈡煡鐪嬬粏鑺傘€?/p>

  • async-loop绾跨▼杩愯鐨勪富鍑芥暟锛岀被浼间簬pthread_create涓殑鍙傛暟start_routine
  • tuple-action-fnspout鍜宐olt閮戒細鏀跺埌tuple,澶勭悊tuple鐨勯€昏緫涓嶅悓浣嗘湁涓€涓悓鍚嶇殑澶勭悊鍑芥暟鍗虫槸tuple-action-fn
  • event-handler鍦ㄨ繖涓垱寤虹殑绾跨▼涓張浣跨敤浜哾isruptor妯″紡锛宒isruptor妯″紡涓€涓噸瑕佺殑姒傚康灏辨槸瑕佸畾涔夌浉搴旂殑event-handler銆備笂闈㈡墍璁茬殑tupleaction-fn灏辨槸鍦╡vent-handler涓澶勭悊銆?/li>

璋冪敤閫昏緫濡備笅鍥炬墍绀?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

spout

鍏堟潵鐪嬬湅濡傛灉鏄痵pout,mk-threads鐨勫鐞嗘楠ゆ槸鍟ユ牱鐨?鍏堣杩欎釜async-loops

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
[(async-loop
      (fn []
        ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
        (while (not @(:storm-active-atom executor-data))
          (Thread/sleep 100))
        
        (log-message "Opening spout " component-id ":" (keys task-datas))
        (doseq [[task-id task-data] task-datas
                :let [^ISpout spout-obj (:object task-data)
                      tasks-fn (:tasks-fn task-data)
                      send-spout-msg (fn [out-stream-id values message-id out-task-id]
                                       (.increment emitted-count)
   (let [out-tasks (if out-task-id
                     (tasks-fn out-task-id out-stream-id values)
                     (tasks-fn out-stream-id values))
         rooted? (and message-id has-ackers?)
         root-id (if rooted? (MessageId/generateId rand))
         out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
     (fast-list-iter [out-task out-tasks id out-ids]
                     (let [tuple-id (if rooted?
                                      (MessageId/makeRootId root-id id)
                                      (MessageId/makeUnanchored))
                           out-tuple (TupleImpl. worker-context
                                                 values
                                                 task-id
                                                 out-stream-id
                                                 tuple-id)]
                       (transfer-fn out-task
                                    out-tuple
                                    overflow-buffer)
                       ))
     (if rooted?
       (do
         (.put pending root-id [task-id
                                message-id
                                {:stream out-stream-id :values values}
                                (if (sampler) (System/currentTimeMillis))])
         (task/send-unanchored task-data
                               ACKER-INIT-STREAM-ID
                               [root-id (bit-xor-vals out-ids) task-id]
                               overflow-buffer))
       (when message-id
         (ack-spout-msg executor-data task-data message-id
                        {:stream out-stream-id :values values}
                        (if (sampler) 0))))
     (or out-tasks [])
     ))]]
          (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
          (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
                                                   :receive receive-queue}
                                                  storm-conf (:user-context task-data))

          (.open spout-obj
                 storm-conf
                 (:user-context task-data)
                 (SpoutOutputCollector.
                  (reify ISpoutOutputCollector
                    (^List emit [this ^String stream-id ^List tuple ^Object message-id]
                      (send-spout-msg stream-id tuple message-id nil)
                      )
                    (^void emitDirect [this ^int out-task-id ^String stream-id
                                       ^List tuple ^Object message-id]
                      (send-spout-msg stream-id tuple message-id out-task-id)
                      )
                    (reportError [this error]
                      (report-error error)
                      )))))
        (reset! open-or-prepare-was-called? true) 
        (log-message "Opened spout " component-id ":" (keys task-datas))
        (setup-metrics! executor-data)
        
        (disruptor/consumer-started! (:receive-queue executor-data))
        (fn []
          ;; This design requires that spouts be non-blocking
          (disruptor/consume-batch receive-queue event-handler)
          
          ;; try to clear the overflow-buffer
          (try-cause
            (while (not (.isEmpty overflow-buffer))
              (let [[out-task out-tuple] (.peek overflow-buffer)]
                (transfer-fn out-task out-tuple false nil)
                (.removeFirst overflow-buffer)))
          (catch InsufficientCapacityException e
            ))
          
          (let [active? @(:storm-active-atom executor-data)
                curr-count (.get emitted-count)]
            (if (and (.isEmpty overflow-buffer)
                     (or (not max-spout-pending)
                         (< (.size pending) max-spout-pending)))
              (if active?
                (do
                  (when-not @last-active
                    (reset! last-active true)
                    (log-message "Activating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.activate spout)))
               
                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
                (do
                  (when @last-active
                    (reset! last-active false)
                    (log-message "Deactivating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
                  ;; TODO: log that it's getting throttled
                  (Time/sleep 100))))
            (if (and (= curr-count (.get emitted-count)) active?)
              (do (.increment empty-emit-streak)
                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak)))
              (.set empty-emit-streak 0)
              ))           
          0))
      :kill-fn (:report-error-and-die executor-data)
      :factory? true
      :thread-name component-id)]))
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽

瀵逛簬spout鏉ヨ,濡備綍澶勭悊鏀跺埌鐨勬暟鎹憿锛岃繖涓€鍒囬兘瑕佷笌disruptor/consume-batch鍏宠仈璧锋潵锛屾敞鎰忎笂杩颁唬鐮佺孩鑹插姞浜儴鍒嗗唴瀹广€?/p>

鍐嶇湅event-handler鐨勫畾涔夛紝 event-handler (mk-task-receiver executor-data tuple-action-fn)銆備笂闈㈢殑璋冪敤鍏崇郴鍥惧氨鍙互涓茶捣鏉ヤ簡銆?/p>

spout涓殑tuple-action-fn瀹氫箟濡備笅锛岃繖涓猼uple-action-fn寰堥噸瑕侊紝濡傛灉璇镐綅鐪嬪畼杩樿寰楁湰鍗氬墠涓€绡囪瑙uple娑堟伅鍙戦€侀€斿緞鏂囩珷鍐呭鐨勮瘽锛宼uple鎺ユ敹鐨勫鐞嗛€昏緫灏藉湪浜庢浜嗐€?/p>

twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽
(fn [task-id ^TupleImpl tuple]
  [stream-id (.getSourceStreamId tuple)]
 ondp = stream-id
 Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
 Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
 (let [id (.getValue tuple 0)
       [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
   (when spout-id
     (when-not (= stored-task-id task-id)
       (throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
     (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
       (condp = stream-id
         ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
                                            spout-id tuple-finished-info time-delta)
         ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
                                              spout-id tuple-finished-info time-delta)
         )))
   ;; TODO: on failure, emit tuple to failure stream
   ))))
twitter storm婧愮爜璧拌涔?topology鎻愪氦杩囩▼鍒嗘瀽


鏈夊叧bolt鐩稿叧thread鐨勫垱寤轰笌娑堟伅鎺ユ敹澶勭悊鍑芥暟灏变笉涓€涓€缃楀垪浜嗭紝鍚勪綅鑷鍒嗘瀽搴旇娌℃湁闂浜嗐€?/p>