博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm trident spout的_maxTransactionActive
阅读量:6092 次
发布时间:2019-06-20

本文共 21432 字,大约阅读时间需要 71 分钟。

本文主要研究一下storm trident spout的_maxTransactionActive

MasterBatchCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

TreeMap
_activeTx = new TreeMap
(); public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1); for(String spoutId: _managedSpoutIds) { _states.add(TransactionalState.newCoordinatorState(conf, spoutId)); } _currTransaction = getStoredCurrTransaction(); _collector = collector; Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING); if(active==null) { _maxTransactionActive = 1; } else { _maxTransactionActive = active.intValue(); } _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive); for(int i=0; i<_spouts.size(); i++) { String txId = _managedSpoutIds.get(i); _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context)); } LOG.debug("Opened {}", this); } public void nextTuple() { sync(); } private void sync() { // note that sometimes the tuples active may be less than max_spout_pending, e.g. // max_spout_pending = 3 // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet), // and there won't be a batch for tx 4 because there's max_spout_pending tx active TransactionStatus maybeCommit = _activeTx.get(_currTransaction); if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) { maybeCommit.status = AttemptStatus.COMMITTING; _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this); } if(_active) { if(_activeTx.size() < _maxTransactionActive) { Long curr = _currTransaction; for(int i=0; i<_maxTransactionActive; i++) { if(!_activeTx.containsKey(curr) && isReady(curr)) { // by using a monotonically increasing attempt id, downstream tasks // can be memory efficient by clearing out state for old attempts // as soon as they see a higher attempt id for a transaction Integer attemptId = _attemptIds.get(curr); if(attemptId==null) { attemptId = 0; } else { attemptId++; } _attemptIds.put(curr, attemptId); for(TransactionalState state: _states) { state.setData(CURRENT_ATTEMPTS, _attemptIds); } TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); final TransactionStatus newTransactionStatus = new TransactionStatus(attempt); _activeTx.put(curr, newTransactionStatus); _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this); _throttler.markEvent(); } curr = nextTransactionId(curr); } } } } private static class TransactionStatus { TransactionAttempt attempt; AttemptStatus status; public TransactionStatus(TransactionAttempt attempt) { this.attempt = attempt; this.status = AttemptStatus.PROCESSING; } @Override public String toString() { return attempt.toString() + " <" + status.toString() + ">"; } } private static enum AttemptStatus { PROCESSING, PROCESSED, COMMITTING }复制代码
  • MasterBatchCoordinator在open方法对_maxTransactionActive进行设置,从Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending),配置文件默认为null,这里在该值为null时设置_maxTransactionActive为1
  • nextTuple这里对同时处理的batches的数量进行了控制,只有_activeTx中的batches处理成功或失败之后才能继续下一个batch
  • _activeTx是一个treeMap,它以transactionId为key,value是TransactionStatus,它里头包含了TransactionAttempt及AttemptStatus;AttemptStatus有三种状态,分别是PROCESSING、PROCESSED、COMMITTING

TridentSpoutCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java

RotatingTransactionalState _state;    public void prepare(Map conf, TopologyContext context) {        _coord = _spout.getCoordinator(_id, conf, context);        _underlyingState = TransactionalState.newCoordinatorState(conf, _id);        _state = new RotatingTransactionalState(_underlyingState, META_DIR);    }    public void execute(Tuple tuple, BasicOutputCollector collector) {        TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);        if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {            _state.cleanupBefore(attempt.getTransactionId());            _coord.success(attempt.getTransactionId());        } else {            long txid = attempt.getTransactionId();            Object prevMeta = _state.getPreviousState(txid);            Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));            _state.overrideState(txid, meta);            collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));        }                    }复制代码
  • TridentSpoutCoordinator的execute方法按txid来存取meta,之后往TridentBoltExecutor发射Values(attempt, meta)

TridentBoltExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

RotatingMap
_batches; public void execute(Tuple tuple) { if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); _lastRotate = now; } return; } String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId()); if(batchGroup==null) { // this is so we can do things like have simple DRPC that doesn't need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {// System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()// + " (" + _batches.size() + ")" +// "\ntuple: " + tuple +// "\nwith tracked " + tracked +// "\nwith id " + id + // "\nwith group " + batchGroup// + "\n");// // } //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don't get an explosion in memory usage in the tasks if(tracked!=null) { if(id.getAttemptId() > tracked.attemptId) { _batches.remove(id.getId()); tracked = null; } else if(id.getAttemptId() < tracked.attemptId) { // no reason to try to execute a previous attempt than we've already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println("TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } public static class TrackedBatch { int attemptId; BatchInfo info; CoordCondition condition; int reportedTasks = 0; int expectedTupleCount = 0; int receivedTuples = 0; Map
taskEmittedTuples = new HashMap<>(); boolean failed = false; boolean receivedCommit; Tuple delayedAck = null; public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) { this.info = info; this.condition = condition; this.attemptId = attemptId; receivedCommit = condition.commitStream == null; } @Override public String toString() { return ToStringBuilder.reflectionToString(this); } } private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { if(tracked.failed) { failBatch(tracked); _collector.fail(tuple); return; } CoordCondition cond = tracked.condition; boolean delayed = tracked.delayedAck==null && (cond.commitStream!=null && type==TupleType.COMMIT || cond.commitStream==null); if(delayed) { tracked.delayedAck = tuple; } boolean failed = false; if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { if(tracked.receivedTuples == tracked.expectedTupleCount) { finishBatch(tracked, tuple); } else { //TODO: add logging that not all tuples were received failBatch(tracked); _collector.fail(tuple); failed = true; } } if(!delayed && !failed) { _collector.ack(tuple); } } private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) { boolean success = true; try { _bolt.finishBatch(tracked.info); String stream = COORD_STREAM(tracked.info.batchGroup); for(Integer task: tracked.condition.targetTasks) { _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0))); } if(tracked.delayedAck!=null) { _collector.ack(tracked.delayedAck); tracked.delayedAck = null; } } catch(FailedException e) { failBatch(tracked, e); success = false; } _batches.remove(tracked.info.batchId.getId()); return success; } private void failBatch(TrackedBatch tracked, FailedException e) { if(e!=null && e instanceof ReportedFailedException) { _collector.reportError(e); } tracked.failed = true; if(tracked.delayedAck!=null) { _collector.fail(tracked.delayedAck); tracked.delayedAck = null; } }复制代码
  • TridentBoltExecutor使用RotatingMap(_batches)来存放batch的信息,key为txid,而valute为TrackedBatch
  • 在调用_bolt.execute(, tuple)方法时,传递了BatchInfo,它里头的state值为_bolt.initBatchState(batchGroup, id),通过_bolt的initBatchState得来的,这是在第一次_batches里头没有该txid信息的时候,第一次创建的时候调用
  • 这里的checkFinish也是根据batch对应的TrackedBatch信息来进行判断的;finishBatch的时候会调用_bolt.finishBatch(),传递batchInfo过去;failBatch也是对batch对应的TrackedBatch进行操作

BatchInfo

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/BatchInfo.java

public class BatchInfo {    public IBatchID batchId;    public Object state;    public String batchGroup;        public BatchInfo(String batchGroup, IBatchID batchId, Object state) {        this.batchGroup = batchGroup;        this.batchId = batchId;        this.state = state;    }}复制代码
  • BatchInfo里头包含了batchId,state以及batchGroup信息

TridentSpoutExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java

public Object initBatchState(String batchGroup, Object batchId) {        return null;    }    public void execute(BatchInfo info, Tuple input) {        // there won't be a BatchInfo for the success stream        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);                _activeBatches.remove(attempt.getTransactionId());            } else {                 throw new FailedException("Received commit for different transaction attempt");            }        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {            // valid to delete before what's been committed since             // those batches will never be accessed again            _activeBatches.headMap(attempt.getTransactionId()).clear();            _emitter.success(attempt);        } else {                        _collector.setBatch(info.batchId);            _emitter.emitBatch(attempt, input.getValue(1), _collector);            _activeBatches.put(attempt.getTransactionId(), attempt);        }    }    public void finishBatch(BatchInfo batchInfo) {    }复制代码
  • TridentSpoutExecutor的execute方法,也是根据txid来区分各自batch的信息

SubtopologyBolt

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java

public Object initBatchState(String batchGroup, Object batchId) {        ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);        for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {            p.startBatch(ret);        }        return ret;    }    public void execute(BatchInfo batchInfo, Tuple tuple) {        String sourceStream = tuple.getSourceStreamId();        InitialReceiver ir = _roots.get(sourceStream);        if(ir==null) {            throw new RuntimeException("Received unexpected tuple " + tuple.toString());        }        ir.receive((ProcessorContext) batchInfo.state, tuple);    }    public void finishBatch(BatchInfo batchInfo) {        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {            p.finishBatch((ProcessorContext) batchInfo.state);        }    }    protected static class InitialReceiver {        List
_receivers = new ArrayList<>(); RootFactory _factory; ProjectionFactory _project; String _stream; public InitialReceiver(String stream, Fields allFields) { // TODO: don't want to project for non-batch bolts...??? // how to distinguish "batch" streams from non-batch streams? _stream = stream; _factory = new RootFactory(allFields); List
projected = new ArrayList<>(allFields.toList()); projected.remove(0); _project = new ProjectionFactory(_factory, new Fields(projected)); } public void receive(ProcessorContext context, Tuple tuple) { TridentTuple t = _project.create(_factory.create(tuple)); for(TridentProcessor r: _receivers) { r.execute(context, _stream, t); } } public void addReceiver(TridentProcessor p) { _receivers.add(p); } public Factory getOutputFactory() { return _project; } }复制代码
  • SubtopologyBolt在initBatchState的时候,创建ProcessorContext的也是带有batchId的标识,这样子不同的batch并行的话,它们的ProcessorContext也是区分开来的
  • execute方法使用的是各自batch的ProcessorContext(batchInfo.state),调用TridentProcessor的execute方法,使用的是各自batch的ProcessorContext
  • finishBatch方法也一样,将(ProcessorContext) batchInfo.state传递给TridentProcessor.finishBatch

AggregateProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AggregateProcessor.java

public void startBatch(ProcessorContext processorContext) {        _collector.setContext(processorContext);        processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);    }        public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {        _collector.setContext(processorContext);        _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);    }        public void finishBatch(ProcessorContext processorContext) {        _collector.setContext(processorContext);        _agg.complete(processorContext.state[_context.getStateIndex()], _collector);    }复制代码
  • AggregateProcessor的startBatch、execute、finishBatch方法都使用了ProcessorContext的state,而该ProcessorContext从SubtopologyBolt传递过来的就是区分batch的

EachProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/EachProcessor.java

public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {        List
parents = tridentContext.getParentTupleFactories(); if(parents.size()!=1) { throw new RuntimeException("Each operation can only have one parent"); } _context = tridentContext; _collector = new AppendCollector(tridentContext); _projection = new ProjectionFactory(parents.get(0), _inputFields); _function.prepare(conf, new TridentOperationContext(context, _projection)); } public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { _collector.setContext(processorContext, tuple); _function.execute(_projection.create(tuple), _collector); } public void startBatch(ProcessorContext processorContext) { } public void finishBatch(ProcessorContext processorContext) { }复制代码
  • EachProcessor则是将ProcessorContext设置到_collector,然后调用_function.execute的时候,将_collector传递过去;这里的_collector为AppendCollector

AppendCollector

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AppendCollector.java

public class AppendCollector implements TridentCollector {    OperationOutputFactory _factory;    TridentContext _triContext;    TridentTuple tuple;    ProcessorContext context;        public AppendCollector(TridentContext context) {        _triContext = context;        _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());    }                    public void setContext(ProcessorContext pc, TridentTuple t) {        this.context = pc;        this.tuple = t;    }    @Override    public void emit(List values) {        TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);        for(TupleReceiver r: _triContext.getReceivers()) {            r.execute(context, _triContext.getOutStreamId(), toEmit);        }    }    @Override    public void reportError(Throwable t) {        _triContext.getDelegateCollector().reportError(t);    }         public Factory getOutputFactory() {        return _factory;    }}复制代码
  • 当_function.execute使用AppendCollector进行emit的时候,AppendCollector会将这些tuple交给TupleReceiver去处理,而传递过去的context为EachProcessor设置的ProcessorContext,即每个batch自己的ProcessorContext;TupleReceiver的execute方法可能对ProcessorContext进行存取,这个也是batch维度的,比如AggregateProcessor将聚合结果存放到自己batch的processorContext.state中

小结

  • storm的trident使用[id,count]数据来告诉下游的TridentBoltExecutor来结束一个batch;而TridentBoltExecutor在接收[id,count]数据的时候,会先判断tracked.reportedTasks是否等于cond.expectedTaskReports(这个在上游的TridentBoltExecutor的parallelism大于1的时候用来聚合这些task的数据),相等之后再判断tracked.receivedTuples是否等于tracked.expectedTupleCount,相等才能进行finishBatch
  • storm的trident spout的_maxTransactionActive参数根据Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending)进行设置,配置文件默认为null,在该值为null时_maxTransactionActive为1
  • MasterBatchCoordinator对同时处理的batches的数量进行了控制,只有_activeTx中的batches处理成功或失败之后才能继续下一个batch;而当并行有多个_activeTx的时候,下游的TridentBoltExecutor也能够区分batch来进行处理,不会造成混乱;比如SubtopologyBolt在initBatchState的时候,创建ProcessorContext的也是带有batchId的标识,这样子不同的batch并行的话,它们的ProcessorContext也是区分开来的;SubtopologyBolt里头调用的TridentProcessor有的会使用ProcessorContext来存储结果,比如AggregateProcessor将聚合结果存放到自己batch的processorContext.state中

doc

转载地址:http://apvwa.baihongyu.com/

你可能感兴趣的文章
Java线程:什么是线程
查看>>
mysql5.7 创建一个超级管理员
查看>>
【框架整合】Maven-SpringMVC3.X+Spring3.X+MyBatis3-日志、JSON解析、表关联查询等均已配置好...
查看>>
要想成为高级Java程序员需要具备哪些知识呢?
查看>>
带着问题去学习--Nginx配置解析(一)
查看>>
onix-文件系统
查看>>
java.io.Serializable浅析
查看>>
我的友情链接
查看>>
多线程之线程池任务管理通用模板
查看>>
CSS3让长单词与URL地址自动换行——word-wrap属性
查看>>
CodeForces 580B Kefa and Company
查看>>
开发规范浅谈
查看>>
Spark Streaming揭秘 Day29 深入理解Spark2.x中的Structured Streaming
查看>>
鼠标增强软件StrokeIt使用方法
查看>>
本地连接linux虚拟机的方法
查看>>
某公司面试java试题之【二】,看看吧,说不定就是你将要做的题
查看>>
BABOK - 企业分析(Enterprise Analysis)概要
查看>>
Linux 配置vnc,开启linux远程桌面
查看>>
CentOS6.4关闭触控板
查看>>
React Native 极光推送填坑(ios)
查看>>