o b@sLdZddlmZddlmZeejejGdddZGdddeZdS) z Producer-Consumer Proxy. ) implementer) interfacesc@szeZdZdZdZdZdZdZdZdZ dZ ddZ ddZ d d Z d d Zd dZddZddZddZdefddZdS)BasicProducerConsumerProxyaa I can act as a man in the middle between any Producer and Consumer. @ivar producer: the Producer I subscribe to. @type producer: L{IProducer} @ivar consumer: the Consumer I publish to. @type consumer: L{IConsumer} @ivar paused: As a Producer, am I paused? @type paused: bool NTFcCs*g|_|dur||_|||jdSdSN)_bufferconsumerregisterProducer iAmStreaming)selfrr 7/usr/lib/python3/dist-packages/twisted/protocols/pcp.py__init__#s z#BasicProducerConsumerProxy.__init__cCsd|_|jr |jdSdSNT)pausedproducerpauseProducingr r r r r+sz)BasicProducerConsumerProxy.pauseProducingcCsXd|_|jr|jd|jg|jdd<n|jsd|_|jdur*|jdSdS)NFT) rrrwritejoinr outstandingPullrresumeProducingrr r r r0s z*BasicProducerConsumerProxy.resumeProducingcCs*|jdur |j|jdur|`dSdSr)r stopProducingrrr r r r=s   z(BasicProducerConsumerProxy.stopProducingcCsF|js |js|js|j|dS|jdur!|j|d|_dSdSNF)rr rrappendrrr datar r r rEs    z BasicProducerConsumerProxy.writecCs |jdur |j|dSr)rfinishunregisterProducerrr r r rNs   z!BasicProducerConsumerProxy.finishcCs||_||_dSr)rproducerIsStreamingr r streamingr r r rSs z+BasicProducerConsumerProxy.registerProducercCs*|jdur |`|`|jr|jdSdSr)rrrrrr r r rWs z-BasicProducerConsumerProxy.unregisterProducerreturncCs"d|jdt|dd|jdS)N<@xz around >) __class__idrrr r r __repr__^s"z#BasicProducerConsumerProxy.__repr__)__name__ __module__ __qualname____doc__rrrr rrstoppedr rrrrrrrstrr)r r r r rs$   rc@sLeZdZdZdZdZdZddZddZdd Z d d Z d d Z ddZ dS)ProducerConsumerProxyzProducerConsumerProxy with a finite buffer. When my buffer fills up, I have my parent Producer pause until my buffer has room in it again. iFcCs d|_dSr)rrr r r ros z$ProducerConsumerProxy.pauseProducingcCsd|_|jr5d|j}||}|t|kr-||d}|jr$Jd|g|jdd<n g|jdd<nd}|jrI|rI|jsI|jdurI|j|jsP| |_ |j dur{t dd|jD}|j rq||j krqd|_ |j dS|j r}|j dSdSdS)NFr.Streaming producer did not write all its data.rcs|]}t|VqdSrlen.0sr r r z8ProducerConsumerProxy.resumeProducing..)rrr_writeSomeDatar4r unregisteredrrrrsumproducerPaused bufferSizer)r r bytesSentunsent bytesBufferedr r r rts@        z%ProducerConsumerProxy.resumeProducingcCs|js |js|js|j|n+|jdur;|jrJd||}d|_|t|ks;|jr1Jd|j||d|jdur\|j r^t dd|jD}||j kr`|j d|_ dSdSdSdS)Nz9Writing fresh data to consumer before my buffer is empty!Fr1csr2rr3r5r r r r8r9z.ProducerConsumerProxy.write..T)rr rrrrr:r4rrr<r>rr=)r rr?rAr r r rs,      zProducerConsumerProxy.writecCs(d|_t||||s|dSdSr)r;rrrr r r r rs  z&ProducerConsumerProxy.registerProducercCs:|jdur |`|`d|_|jr|js|jdSdSdSr)rrr;rrrrr r r rs  z(ProducerConsumerProxy.unregisterProducercCs"|jdurdS|j|t|S)z`Write as much of this data as possible. @returns: The number of bytes written. Nr)rrr4rr r r r:s  z$ProducerConsumerProxy._writeSomeDataN) r*r+r,r-r>r=r;rrrrrr:r r r r r0bs- r0N) r-zope.interfacertwisted.internetr IProducer IConsumerrr0r r r r s    S