00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef _SessionImpl_
00023 #define _SessionImpl_
00024
00025 #include "Demux.h"
00026 #include "Execution.h"
00027 #include "Results.h"
00028
00029 #include "qpid/SessionId.h"
00030 #include "qpid/SessionState.h"
00031 #include "boost/shared_ptr.hpp"
00032 #include "boost/weak_ptr.hpp"
00033 #include "qpid/framing/FrameHandler.h"
00034 #include "qpid/framing/ChannelHandler.h"
00035 #include "qpid/framing/SequenceNumber.h"
00036 #include "qpid/framing/AMQP_ClientOperations.h"
00037 #include "qpid/framing/AMQP_ServerProxy.h"
00038 #include "qpid/sys/Semaphore.h"
00039 #include "qpid/sys/StateMonitor.h"
00040 #include "qpid/sys/ExceptionHolder.h"
00041
00042 #include <boost/optional.hpp>
00043
00044 namespace qpid {
00045
00046 namespace framing {
00047
00048 class FrameSet;
00049 class MethodContent;
00050 class SequenceSet;
00051
00052 }
00053
00054 namespace client {
00055
00056 class Future;
00057 class ConnectionImpl;
00058 class SessionHandler;
00059
00061 class SessionImpl : public framing::FrameHandler::InOutHandler,
00062 public Execution,
00063 private framing::AMQP_ClientOperations::SessionHandler,
00064 private framing::AMQP_ClientOperations::ExecutionHandler,
00065 private framing::AMQP_ClientOperations::MessageHandler
00066 {
00067 public:
00068 SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>);
00069 ~SessionImpl();
00070
00071
00072
00073 framing::FrameSet::shared_ptr get();
00074
00075 const SessionId getId() const;
00076
00077 uint16_t getChannel() const;
00078 void setChannel(uint16_t channel);
00079
00080 void open(uint32_t detachedLifetime);
00081 void close();
00082 void resume(shared_ptr<ConnectionImpl>);
00083 void suspend();
00084
00085 void assertOpen() const;
00086
00087 Future send(const framing::AMQBody& command);
00088 Future send(const framing::AMQBody& command, const framing::MethodContent& content);
00089 Future send(const framing::AMQBody& command, const framing::FrameSet& content);
00090 void sendRawFrame(framing::AMQFrame& frame);
00091
00092 Demux& getDemux();
00093 void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
00094 void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
00095 bool isComplete(const framing::SequenceNumber& id);
00096 bool isCompleteUpTo(const framing::SequenceNumber& id);
00097 void waitForCompletion(const framing::SequenceNumber& id);
00098 void sendCompletion();
00099 void sendFlush();
00100
00101 void setException(const sys::ExceptionHolder&);
00102
00103
00104 void connectionClosed(uint16_t code, const std::string& text);
00105 void connectionBroke(const std::string& text);
00106
00108 uint32_t setTimeout(uint32_t requestedSeconds);
00109
00111 uint32_t getTimeout() const;
00112
00116 void setWeakPtr(bool weak=true);
00117
00121 shared_ptr<ConnectionImpl> getConnection();
00122
00124 void disableAutoDetach();
00125
00126 private:
00127 enum State {
00128 INACTIVE,
00129 ATTACHING,
00130 ATTACHED,
00131 DETACHING,
00132 DETACHED
00133 };
00134 typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
00135 typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
00136 typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler;
00137 typedef sys::StateMonitor<State, DETACHED> StateMonitor;
00138 typedef StateMonitor::Set States;
00139
00140 inline void setState(State s);
00141 inline void waitFor(State);
00142
00143 void setExceptionLH(const sys::ExceptionHolder&);
00144 void detach();
00145
00146 void check() const;
00147 void checkOpen() const;
00148 void handleClosed();
00149
00150 void handleIn(framing::AMQFrame& frame);
00151 void handleOut(framing::AMQFrame& frame);
00158 void proxyOut(framing::AMQFrame& frame);
00159 void sendFrame(framing::AMQFrame& frame, bool canBlock);
00160 void deliver(framing::AMQFrame& frame);
00161
00162 Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
00163 void sendContent(const framing::MethodContent&);
00164 void waitForCompletionImpl(const framing::SequenceNumber& id);
00165
00166 void sendCompletionImpl();
00167
00168
00169
00170 void attach(const std::string& name, bool force);
00171 void attached(const std::string& name);
00172 void detach(const std::string& name);
00173 void detached(const std::string& name, uint8_t detachCode);
00174 void requestTimeout(uint32_t timeout);
00175 void timeout(uint32_t timeout);
00176 void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
00177 void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
00178 void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);
00179 void completed(const framing::SequenceSet& commands, bool timelyReply);
00180 void knownCompleted(const framing::SequenceSet& commands);
00181 void flush(bool expected, bool confirmed, bool completed);
00182 void gap(const framing::SequenceSet& commands);
00183
00184
00185
00186 void sync();
00187 void result(const framing::SequenceNumber& commandId, const std::string& value);
00188 void exception(uint16_t errorCode,
00189 const framing::SequenceNumber& commandId,
00190 uint8_t classCode,
00191 uint8_t commandCode,
00192 uint8_t fieldIndex,
00193 const std::string& description,
00194 const framing::FieldTable& errorInfo);
00195
00196
00197
00198
00199 void accept(const qpid::framing::SequenceSet&);
00200 void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&);
00201 void release(const qpid::framing::SequenceSet&, bool);
00202 qpid::framing::MessageResumeResult resume(const std::string&, const std::string&);
00203 void setFlowMode(const std::string&, uint8_t);
00204 void flow(const std::string&, uint8_t, uint32_t);
00205 void stop(const std::string&);
00206
00207
00208 sys::ExceptionHolder exceptionHolder;
00209 mutable StateMonitor state;
00210 mutable sys::Semaphore sendLock;
00211 uint32_t detachedLifetime;
00212 const uint64_t maxFrameSize;
00213 const SessionId id;
00214
00215 shared_ptr<ConnectionImpl> connectionShared;
00216 boost::weak_ptr<ConnectionImpl> connectionWeak;
00217 bool weakPtr;
00218
00219 framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
00220 framing::ChannelHandler channel;
00221 framing::AMQP_ServerProxy::Session proxy;
00222
00223 Results results;
00224 Demux demux;
00225 framing::FrameSet::shared_ptr arriving;
00226
00227 framing::SequenceSet incompleteIn;
00228 framing::SequenceSet completedIn;
00229 framing::SequenceSet incompleteOut;
00230 framing::SequenceSet completedOut;
00231 framing::SequenceNumber nextIn;
00232 framing::SequenceNumber nextOut;
00233
00234 SessionState sessionState;
00235
00236
00237 sys::Semaphore* sendMsgCredit;
00238
00239 bool autoDetach;
00240
00241 friend class client::SessionHandler;
00242 };
00243
00244 }}
00245
00246 #endif