
    gE(                     N   d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ dZi Zd Z ed       G d d             Z G d de      Z ed       G d de             Z ed       G d de             Z G d d      Z G d d      Zy)z$Async I/O backend support utilities.    N)deque)Empty)sleep)WeakKeyDictionary)detect_environment)states)TimeoutError)THREAD_TIMEOUT_MAX)AsyncBackendMixinBaseResultConsumerDrainerregister_drainerc                       fd}|S )z5Decorator used to register a new result drainer type.c                     | t         <   | S N)drainers)clsnames    ]/var/www/html/hubwallet-dev/venv/lib/python3.12/site-packages/celery/backends/asynchronous.py_innerz register_drainer.<locals>._inner   s    
     )r   r   s   ` r   r   r      s     Mr   defaultc                   2    e Zd ZdZd Zd Zd ZddZd	dZy)
r   zResult draining service.c                     || _         y r   )result_consumer)selfr   s     r   __init__zDrainer.__init__$   s
    .r   c                      y r   r   r   s    r   startzDrainer.start'       r   c                      y r   r   r    s    r   stopzDrainer.stop*   r"   r   Nc              #   V  K   |xs | j                   j                  }t        j                         }	 |r.t        j                         |z
  |k\  rt	        j
                         	 | j                  |||       |r |        |j                  ry ^# t        j
                  $ r Y ,w xY wwNtimeout)r   drain_eventstime	monotonicsocketr(   wait_forready)r   pr(   intervalon_intervalwait
time_starts          r   drain_events_untilzDrainer.drain_events_until-   s     8t++88^^%
4>>+j8GCnn&&mmAtXm>> ww  >> s*   A B)#B 9B)B&#B)%B&&B)c                      ||       y r&   r   r   r/   r2   r(   s       r   r-   zDrainer.wait_for>   s
    Wr   )N   NNr   )	__name__
__module____qualname____doc__r   r!   r$   r4   r-   r   r   r   r   r       s    "/"r   r   c                   P     e Zd ZdZdZdZd Zd Z fdZd Z	d Z
d Zd	dZ xZS )
greenletDrainerNc                      y)z,create new self._drain_complete_event objectNr   r    s    r   _create_drain_complete_eventz,greenletDrainer._create_drain_complete_eventG       r   c                      y)z5raise self._drain_complete_event for wakeup .wait_forNr   r    s    r   _send_drain_complete_eventz*greenletDrainer._send_drain_complete_eventK   r@   r   c                     t        |   |i | t        j                         | _        t        j                         | _        t        j                         | _        | j                          y r   )superr   	threadingEvent_started_stopped	_shutdownr?   )r   argskwargs	__class__s      r   r   zgreenletDrainer.__init__O   sK    $)&)!)!)"*))+r   c                    | j                   j                          | j                  j                         sX	 | j                  j                  d       | j                          | j                          | j                  j                         sX| j                  j                          y # t        j                  $ r Y Kw xY w)Nr7   r'   )rG   setrH   is_setr   r)   rB   r?   r,   r(   rI   r    s    r   runzgreenletDrainer.runV   s    --&&($$11!1<//1113	 --&&( 	 >> s   <B( (B>=B>c                     | j                   j                         s;| j                  | j                        | _        | j                   j                          y y r   )rG   rO   spawnrP   _gr2   r    s    r   r!   zgreenletDrainer.starta   s;    }}##%jj*DGMM  &r   c                     | j                   j                          | j                          | j                  j	                  t
               y r   )rH   rN   rB   rI   r2   r
   r    s    r   r$   zgreenletDrainer.stopf   s1    '')./r   c                 v    | j                          |j                  s| j                  j                  |       y y r&   )r!   r.   _drain_complete_eventr2   r6   s       r   r-   zgreenletDrainer.wait_fork   s.    

ww&&++G+< r   r   )r8   r9   r:   rR   rS   rV   r?   rB   r   rP   r!   r$   r-   __classcell__)rL   s   @r   r=   r=   B   s6    E	B ,	!
0
=r   r=   eventletc                       e Zd Zd Zd Zd Zy)eventletDrainerc                 6    ddl m}m}  ||      } |d       |S )Nr   )r   rR   )rX   r   rR   )r   funcr   rR   gs        r   rR   zeventletDrainer.spawnt   s    )$Kar   c                 (    ddl m}  |       | _        y Nr   )rF   )eventlet.eventrF   rV   r   rF   s     r   r?   z,eventletDrainer._create_drain_complete_eventz   s    (%*W"r   c                 8    | j                   j                          y r   )rV   sendr    s    r   rB   z*eventletDrainer._send_drain_complete_event~   s    ""'')r   Nr8   r9   r:   rR   r?   rB   r   r   r   rZ   rZ   q   s    -*r   rZ   geventc                       e Zd Zd Zd Zd Zy)geventDrainerc                 R    dd l }|j                  |      }|j                  d       |S )Nr   )re   rR   r   )r   r\   re   r]   s       r   rR   zgeventDrainer.spawn   s#    LLQr   c                 (    ddl m}  |       | _        y r_   )gevent.eventrF   rV   ra   s     r   r?   z*geventDrainer._create_drain_complete_event   s    &%*W"r   c                 X    | j                   j                          | j                          y r   )rV   rN   r?   r    s    r   rB   z(geventDrainer._send_drain_complete_event   s     ""&&())+r   Nrd   r   r   r   rg   rg      s    -,r   rg   c                   r    e Zd ZdZd ZddZddZd ZddZddZ	d Z
d	 Zd
 Z	 ddZ	 ddZed        Zy)r   z.Mixin for backends that enables the async API.c                 6    || j                   j                  |<   y r   )r   buckets)r   resultbuckets      r   _collect_intozAsyncBackendMixin._collect_into   s    /5$$V,r   c              +   \  K   | j                          |j                  }|s
t               t               }|D ]P  }t	        |d      s|j                  |       !|j                  r|j                  |       ?| j                  ||       R  | j                  |fd|i|D ]Y  }|s|j                         }t	        |d      s|j                  |j                  f n|j                  |j                  f |rT[ |r.|j                         }|j                  |j                  f |r-y y w)N_cacheno_ack)_ensure_not_eagerresultsStopIterationr   hasattrappendrs   rq   _wait_for_pendingpopleftidchildren)r   ro   rt   rK   rv   rp   node_s           r   iter_nativezAsyncBackendMixin.iter_native   s     ../!  	1D4*d#d#""40	1 (''HvHH 	/A~~'tX.''4==00''4;;.. 	/ >>#D''4;;&& s   B"D,%AD,90D,*D,c                     |r$| j                   j                  j                          	 | j                  |       |S # t        $ r" | j                  |j                  ||       Y |S w xY w)N)weak)r   drainerr!   _maybe_resolve_from_bufferr   _add_pending_resultr|   )r   ro   r   start_drainers       r   add_pending_resultz$AsyncBackendMixin.add_pending_result   si      ((..0	C++F3   	C$$VYYT$B	Cs   ; 'A&%A&c                 l    |j                  | j                  j                  |j                               y r   )_maybe_set_cache_pending_messagestaker|   r   ro   s     r   r   z,AsyncBackendMixin._maybe_resolve_from_buffer   s%     6 6 ; ;FII FGr   c                     | j                   \  }}||vr4|j                  |vr%||r|n||<   | j                  j                  |       y y y r   )_pending_resultsr|   r   consume_from)r   task_idro   r   concreteweak_s         r   r   z%AsyncBackendMixin._add_pending_result   sN    //%%FIIX$=5;dU'2  --g6 %>r   c                     | j                   j                  j                          |D cg c]  }| j                  ||d       c}S c c}w )NF)r   r   )r   r   r!   r   )r   rv   r   ro   s       r   add_pending_resultsz%AsyncBackendMixin.add_pending_results   sL    $$**,%' ''T'O ' 	' 's   Ac                 ^    | j                  |j                         | j                  |       |S r   )_remove_pending_resultr|   on_result_fulfilledr   s     r   remove_pending_resultz'AsyncBackendMixin.remove_pending_result   s'    ##FII.  (r   c                 J    | j                   D ]  }|j                  |d         y r   )r   popr   r   mappings      r   r   z(AsyncBackendMixin._remove_pending_result   s%    ,, 	'GKK&	'r   c                 N    | j                   j                  |j                         y r   )r   
cancel_forr|   r   s     r   r   z%AsyncBackendMixin.on_result_fulfilled   s    ''		2r   Nc                 x    | j                           | j                  |fi |D ]  } |j                  ||      S )N)callback	propagate)ru   rz   maybe_throw)r   ro   r   r   rK   r   s         r   wait_for_pendingz"AsyncBackendMixin.wait_for_pending   sH     '''9&9 	A	!!8y!IIr   c                 D     | j                   j                  |f|||d|S )N)r(   r1   
on_message)r   rz   )r   ro   r(   r1   r   rK   s         r   rz   z#AsyncBackendMixin._wait_for_pending   s9     6t##55
##

 
 	
r   c                      yNTr   r    s    r   is_asynczAsyncBackendMixin.is_async   s    r   )T)FT)Fr   NNN)r8   r9   r:   r;   rq   r   r   r   r   r   r   r   r   r   rz   propertyr   r   r   r   r   r      s^    86':H7'

'3 37J FJ
  r   r   c                   n    e Zd ZdZd Zd Zd ZddZd Zd Z	d	 Z
d
 ZddZ	 ddZddZd Zd Zd Zy)r   z2Manager responsible for consuming result messages.c                     || _         || _        || _        || _        || _        d | _        t               | _        t        t                  |       | _
        y r   )backendappacceptr   r   r   r   rn   r   r   r   )r   r   r   r   pending_resultspending_messagess         r   r   zBaseResultConsumer.__init__   sM     /!1(* 2 45d;r   c                     t               r   NotImplementedError)r   initial_task_idrK   s      r   r!   zBaseResultConsumer.start       !##r   c                      y r   r   r    s    r   r$   zBaseResultConsumer.stop   r"   r   Nc                     t               r   r   )r   r(   s     r   r)   zBaseResultConsumer.drain_events  r   r   c                     t               r   r   r   r   s     r   r   zBaseResultConsumer.consume_from  r   r   c                     t               r   r   r   s     r   r   zBaseResultConsumer.cancel_for	  r   r   c                     | j                   j                          t               | _         d | _        | j	                          y r   )rn   clearr   r   on_after_forkr    s    r   _after_forkzBaseResultConsumer._after_fork  s/    (*r   c                      y r   r   r    s    r   r   z BaseResultConsumer.on_after_fork  r"   r   c                 >    | j                   j                  |||      S )Nr(   r1   )r   r4   )r   r/   r(   r1   s       r   r4   z%BaseResultConsumer.drain_events_until  s%    ||..wK / 9 	9r   c              +   0  K    | j                   |fd|i| | j                  |c}| _        	 | j                  |j                  ||      D ]  }d  t	        d        	 || _        y # t
        j                  $ r t        d      w xY w# || _        w xY ww)Nr(   r   r   zThe operation timed out.)on_wait_for_pendingr   r4   on_readyr   r,   r(   r	   )r   ro   r(   r1   r   rK   	prev_on_mr   s           r   rz   z$BaseResultConsumer._wait_for_pending  s      	!  CCFC%)__j"	4?		(,,OOW + - -  a	 (DO ~~ 	;9::	; (DOs(   *B2A(  B(BB
 
	BBc                      y r   r   )r   ro   r(   rK   s       r   r   z&BaseResultConsumer.on_wait_for_pending)  r"   r   c                 <    | j                  |j                  |       y r   )on_state_changepayload)r   messages     r   on_out_of_band_resultz(BaseResultConsumer.on_out_of_band_result,  s    W__g6r   c                 f    | j                   D ]	  }	 ||   c S  t        |      # t        $ r Y "w xY wr   )r   KeyErrorr   s      r   _get_pending_resultz&BaseResultConsumer._get_pending_result/  sH    ,, 	Gw''	
 w  s   $	00c                    | j                   r| j                  |       |d   t        j                  v rW|d   }	 | j                  |      }|j	                  |       | j
                  }	 |j                  |      }|j                  |       t        d       y # t        $ r Y w xY w# t        $ r | j                  j                  ||       Y Bw xY w)Nstatusr   r   )r   r   READY_STATESr   r   rn   r   ry   r   r   putr   )r   metar   r   ro   rn   rp   s          r   r   z"BaseResultConsumer.on_state_change7  s    ??OOD!>V0009oG*11': ''-,,*$[[0F
 MM&)a     : &&**7D9:s#   B% (B 	B"!B"%%CCr   )NNr   )r8   r9   r:   r;   r   r!   r$   r)   r   r   r   r   r4   rz   r   r   r   r   r   r   r   r   r      sR    <	<$$$$9
 FJ( 7 r   r   )r;   r,   rE   r*   collectionsr   queuer   r   weakrefr   kombu.utils.compatr   celeryr   celery.exceptionsr	   celery.utils.threadsr
   __all__r   r   r   r=   rZ   rg   r   r   r   r   r   <module>r      s    *       % 1  * 3
  )  B,=g ,=^ **o * *  (,O , ,"X Xv^ ^r   