
    g8                        d Z ddlZddlmZ ddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ dZdZ G d de      Z G d d      Z	 	 d#dZd$dZd ZeddfdZd Z	 	 d%dZd Zd Z d Z!d Z" G d d      Z#	 	 	 	 d&dZ$d Z%d Z&d  Z'd! Z( eee"      Z) ee%e"      Z* ee&e"      Z+ ee'e"      Z,y)'z,Message migration tools (Broker <-> Broker).    N)partial)cycleislice)Queue	eventloop)maybe_declare)ensure_bytes)app_or_default)worker_direct)str_to_list)StopFilteringState	republishmigrate_taskmigrate_tasksmove
task_id_eq
task_id_instart_filtermove_task_by_idmove_by_idmapmove_by_taskmapmove_directmove_direct_by_idzGMoving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]c                       e Zd ZdZy)r   z*Semi-predicate used to signal filter stop.N)__name__
__module____qualname____doc__     W/var/www/html/hubwallet-dev/venv/lib/python3.12/site-packages/celery/contrib/migrate.pyr   r      s    4r!   r   c                   2    e Zd ZdZdZdZdZed        Zd Z	y)r   zMigration progress state.r   c                 F    | j                   syt        | j                         S )N?)	total_apxstrselfs    r"   strtotalzState.strtotal&   s    ~~4>>""r!   c                 n    | j                   rd| j                    S | j                   d| j                   S )N^/)filteredcountr*   r(   s    r"   __repr__zState.__repr__,   s3    ==t}}o&&**Qt}}o..r!   N)
r   r   r   r   r/   r.   r&   propertyr*   r0   r    r!   r"   r   r      s+    #EHI# #
/r!   r   c                    |sg d}t        |j                        }|j                  |j                  |j                  }}}||d   n|}||d   n|}|j
                  |j                  }
}	|j                  dd      }|j                  dd      }|t        |      nd}|D ]  }|j                  |d         | j                  t        |      f|||||	|
|d| y)zRepublish message.)application_headerscontent_typecontent_encodingheadersNexchangerouting_keycompression
expiration)r7   r8   r9   r6   r4   r5   r:   )
r	   bodydelivery_infor6   
propertiesr4   r5   popfloatpublish)producermessager7   r8   remove_propsr;   infor6   propsctypeencr9   r:   keys                 r"   r   r   2   s     7%D#11#OOW-?-? 'D#+#3tJH)4)<$}%+K%%w'?'?3E ++mT2K<.J&0&<z"$J 		#t H\$' (!,+$5&)j 	r!   c           	          |j                   }|i n|}t        | ||j                  |d         |j                  |d                y)zMigrate single task message.Nr7   r8   r7   r8   )r<   r   get)rA   body_rB   queuesrD   s        r"   r   r   P   sF      D>RvFhzz$z"23 **T-%89;r!   c                       fd}|S )Nc                 *    r| d   vry  | |      S Ntaskr    )r;   rB   callbacktaskss     r"   r.   z!filter_callback.<locals>.filtered[   s!    T&\.g&&r!   r    )rR   rS   r.   s   `` r"   filter_callbackrT   Y   s    '
 Or!   c                     t        |      }t              |j                  j                  |d      t	        |      }fd}t        || |f|d|S )z)Migrate tasks from one broker to another.F)auto_declarerM   c                     | j                         }j                  | j                  | j                        |_        |j                  | j                  k(  r+j                  | j                  |j                        |_        |j                  j                  | j                  k(  r5j                  | j                  | j                        |j                  _        |j                          y N)channelrK   namer8   r7   declare)queue	new_queuerA   rM   s     r"   on_declare_queuez'migrate_tasks.<locals>.on_declare_queuek   s    (**+	EJJ

;	  EJJ.$*JJuzz/8/D/D%FI!""ejj0&,jjUZZ&HI#r!   )rM   r_   )r
   prepare_queuesamqpProducerr   r   )sourcedestmigrateapprM   kwargsr_   rA   s       `  @r"   r   r   c   sp     
CF#Fxx  E :Hgx7G VW EV)9E=CE Er!   c                 X    t        |t              r| j                  j                  |   S |S rY   )
isinstancer'   ra   rM   )rf   qs     r"   _maybe_queuerk   y   s$    !Sxxq!!Hr!   c	           
      N    t        |      }|xs g D 
cg c]  }
t        ||
       c}
xs d}|j                  |d      5 |j                  j	                        t                f	d}t        ||fd|i|	cddd       S c c}
w # 1 sw Y   yxY w)aG	  Find tasks by filtering them and move the tasks to a new queue.

    Arguments:
        predicate (Callable): Filter function used to decide the messages
            to move.  Must accept the standard signature of ``(body, message)``
            used by Kombu consumer callbacks.  If the predicate wants the
            message to be moved it must return either:

                1) a tuple of ``(exchange, routing_key)``, or

                2) a :class:`~kombu.entity.Queue` instance, or

                3) any other true value means the specified
                    ``exchange`` and ``routing_key`` arguments will be used.
        connection (kombu.Connection): Custom connection to use.
        source: List[Union[str, kombu.Queue]]: Optional list of source
            queues to use instead of the default (queues
            in :setting:`task_queues`).  This list can also contain
            :class:`~kombu.entity.Queue` instances.
        exchange (str, kombu.Exchange): Default destination exchange.
        routing_key (str): Default destination routing key.
        limit (int): Limit number of messages to filter.
        callback (Callable): Callback called after message moved,
            with signature ``(state, body, message)``.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, str):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    Note:
        The predicate may also return a tuple of ``(exchange, routing_key)``
        to specify the destination to where the task should be moved,
        or a :class:`~kombu.entity.Queue` instance.
        Any other true value means that the task will be moved to the
        default exchange/routing_key.
    NF)poolc                   	  	| |      }|rr |      }t        |t              r9t        |j                         |j                  j
                  |j                  }}nt        |      \  }}t        
|||       |j                          xj                  dz  c_
        r
 | |       rj                  k\  r
t               y y y )NrJ      )ri   r   r   default_channelr7   r[   r8   expand_destr   ackr.   r   )r;   rB   retexrkrR   connr7   limit	predicaterA   r8   state	transforms        r"   on_taskzmove.<locals>.on_task   s    D'*C#C.Cc5)!#t';';< \\..B(hDFB(G#%27!#UD'2U^^u4'/) 55 r!   consume_from)r
   rk   connection_or_acquirera   rb   r   r   )rx   
connectionr7   r8   rc   rf   rR   rw   rz   rg   r]   rM   r{   rv   rA   ry   s   ` ``  ```    @@@r"   r   r      s    | 
C4:LbA5l3&AITF		"	":E	"	: Od88$$T*	* 	*( CwNVNvN1O O BO Os   B
ABB$c                 N    	 | \  }}||fS # t         t        f$ r
 ||}}Y ||fS w xY wrY   )	TypeError
ValueError)rs   r7   r8   rt   ru   s        r"   rq   rq      sC    'B r6M z" ';Br6M's    $$c                     |d   | k(  S )z'Return true if task id equals task_id'.idr    )task_idr;   rB   s      r"   r   r      s    :  r!   c                     |d   | v S )z-Return true if task id is member of set ids'.r   r    )idsr;   rB   s      r"   r   r      s    :r!   c                     t        | t              r| j                  d      } t        | t              rt	        d | D              } | i } | S )N,c           
   3   x   K   | ]2  }t        t        t        |j                  d             dd             4 yw):N   )tupler   r   split).0rj   s     r"   	<genexpr>z!prepare_queues.<locals>.<genexpr>   s3      ' F5#6a@A 's   8:)ri   r'   r   listdictrW   s    r"   r`   r`      sJ    &#c"&$ '%' '~Mr!   c                   @    e Zd Z	 	 	 	 d	dZd Zd Zd Zd Zd Zd Z	y)
FiltererNc                    || _         || _        || _        || _        || _        || _        t        t        |      xs g       | _        t        |      | _
        |	| _        |
| _        || _        |xs t        | j                        D cg c]  }t        | j                   |       c}| _        |xs
 t#               | _        || _        y c c}w rY   )rf   rv   filterrw   timeoutack_messagessetr   rS   r`   rM   rR   foreverr_   r   rk   r|   r   ry   accept)r)   rf   rv   r   rw   r   r   rS   rM   rR   r   r_   r|   ry   r   rg   rj   s                    r"   __init__zFilterer.__init__   s    
 	
(U+1r2
$V,  0 "6T$++%6
 1%
 %eg

s   Cc                 D   | j                  | j                               5  	 t        | j                  | j                  | j
                        D ]  } 	 d d d        | j                  S # t        j                  $ r Y )t        $ r Y 3w xY w# 1 sw Y   | j                  S xY w)N)r   ignore_timeouts)	prepare_consumercreate_consumerr   rv   r   r   socketr   ry   )r)   _s     r"   startzFilterer.start  s    ""4#7#7#9: 		"499+/<<37<<A A 		 zz	 >>   		 zzs:   B1A(B(B;B=BBBBBc                     | j                   xj                  dz  c_        | j                  r.| j                   j                  | j                  k\  r
t               y y )Nro   )ry   r/   rw   r   r)   r;   rB   s      r"   update_statezFilterer.update_state  sB    

A::$****djj8/! 9:r!   c                 $    |j                          y rY   )rr   r   s      r"   ack_messagezFilterer.ack_message#  s    r!   c                     | j                   j                  j                  | j                  | j                  | j
                        S )N)rM   r   )rf   ra   TaskConsumerrv   r|   r   r(   s    r"   r   zFilterer.create_consumer&  s9    xx}}))II$$;; * 
 	
r!   c                 \   | j                   }| j                  }| j                  }| j                  rBt	        || j                        }t	        || j                        }t	        || j                        }|j                  |       |j                  |       | j                  r|j                  | j                         | j                  St        | j                  | j                        }| j                  rt	        || j                        }|j                  |       | j                  |       |S rY   )r   r   r   rS   rT   register_callbackr   rR   r   ry   declare_queues)r)   consumerr   r   r   rR   s         r"   r   zFilterer.prepare_consumer-  s    ((&&::$VTZZ8F*<DL)+tzzBK""6*""<0&&t'7'78==$t}}djj9Hzz*8TZZ@&&x0H%r!   c                 x   |j                   D ]  }| j                   r|j                  | j                   vr(| j                  | j                  |       	  ||j                        j	                  d      \  }}}|r| j
                  xj                  |z  c_         y # | j                  j                  $ r Y w xY w)NT)passive)	rM   r[   r_   rZ   queue_declarery   r&   rv   channel_errors)r)   r   r]   r   mcounts        r"   r   zFilterer.declare_queuesA  s    __ 	E{{uzz<$$0%%e,$$$ &&3mDm&A 61JJ((F2(	 99++ s   ABB98B9Ng      ?FNNNFNNNN)
r   r   r   r   r   r   r   r   r   r   r    r!   r"   r   r      s3     &)8<@D7;	."

(r!   r   c                 R    t        | ||f|||||||	|
|||d|j                         S )zFilter tasks.)rw   r   r   rS   rM   rR   r   r_   r|   ry   r   )r   r   )rf   rv   r   rw   r   r   rS   rM   rR   r   r_   r|   ry   r   rg   s                  r"   r   r   Q  sQ    
 T6!)!  %'r!   c                     t        | |ifi |S )a  Find a task by id and move it to another queue.

    Arguments:
        task_id (str): Id of task to find and move.
        dest: (str, kombu.Queue): Destination queue.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.
        **kwargs (Any): Also supports the same keyword
            arguments as :func:`move`.
    )r   )r   rd   rg   s      r"   r   r   f  s     '43F33r!   c                 <      fd}t        |fdt               i|S )a  Move tasks by matching from a ``task_id: queue`` mapping.

    Where ``queue`` is a queue to move the task to.

    Example:
        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])
    c                 @    j                  |j                  d         S )Ncorrelation_id)rK   r=   r;   rB   maps     r"   task_id_in_mapz%move_by_idmap.<locals>.task_id_in_map  s    www))*:;<<r!   rw   )r   len)r   rg   r   s   `  r"   r   r   t  s#    =
 9c#h9&99r!   c                 &      fd}t        |fi |S )a  Move tasks by matching from a ``task_name: queue`` mapping.

    ``queue`` is the queue to move the task to.

    Example:
        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })
    c                 ,    j                  | d         S rP   )rK   r   s     r"   task_name_in_mapz)move_by_taskmap.<locals>.task_name_in_map  s    wwtF|$$r!   )r   )r   rg   r   s   `  r"   r   r     s    %  +F++r!   c                 F    t        t        j                  d| |d|       y )N)ry   r;   r    )printMOVING_PROGRESS_FMTformat)ry   r;   rB   rg   s       r"   filter_statusr     s     	

$
$
F5t
Fv
FGr!   )rz   )NNNrY   )NNNNNNNNr   )-r   r   	functoolsr   	itertoolsr   r   kombur   r   kombu.commonr   kombu.utils.encodingr	   
celery.appr
   celery.utils.nodenamesr   celery.utils.textr   __all__r   	Exceptionr   r   r   r   rT   r   rk   r   rq   r   r   r`   r   r   r   r   r   r   r   r   move_direct_by_idmapmove_direct_by_taskmapr    r!   r"   <module>r      s   2   # " & - % 0 ) 5I 5/ /& =A<; )5$E, AEEIXOv!

W Wt 9<8<@D7;*4:(,"H dm4O}E }F  MJ r!   