
    /i;                        d dl mZmZ d dlmZ d dlZd dlmZ d dlm	Z	m
Z
mZmZ d dlmZ d dlmZmZ d dlZd dlZd dlmZ d dlZd d	lmZ  e        ej2                  Zej4                  Zej6                  Zej8                  Zej:                  Z ej<                  eeeed
      Zdej@                   dejB                   dejD                   Z# eddd      Z$e$jJ                  jM                  ddgddd
d
dddd
       e$jO                  g d       e$jQ                  d
      d        Z)e$jQ                  d
      d        Z*e$jQ                  d
      d        Z+e$jQ                  d
      d        Z,y)    )Celerysignals)SessionN)SessionLocal)create_task_entryupdate_task_statusextract_review_detailsgenerate_recommendations_task)store_reviews)analyze_sentimentanalyze_batch_sentiment)settings)load_dotenvT)app_idkeysecretclustersslzredis://:/celery_workerzredis://localhost:6379/0)brokerbackendjsonUTCi  i     i  )
task_serializeraccept_contentresult_serializertimezone
enable_utctask_track_startedtask_time_limittask_soft_time_limitworker_prefetch_multiplierworker_max_tasks_per_child)zsrc.marketing.taskszsrc.menu_design.taskszsrc.smart_inventory.tasksz	src.utils)bindc                    t               }	 t        d| j                  j                   dt	        |       d       t        |D cg c]  }|d   	 c}|d         }t	        |      t	        |      k(  rt        |      D ]  \  }}t        |t              r|d   }|||   d<   t        d| j                  j                   d|d	z    d
|d           t        d| j                  j                   dt        |d   j                                        |d   |d   |d   |d   |d   |d}t        ||       	 |j                  d      rbt        |j                  dd	      d	      }	t        |	d	z
  t        z  t	        |      z   |d         }
t!        |
|d   z  dz        }t        |d      }ndx}
}d}t"        j%                  |d|d   |
|d   ||dk  rdnd|dk  rd|
 d|d    dndd       t        d| j                  j                   d | d!| d"       t        d| j                  j                   d$       n;t        d| j                  j                   d%t	        |       d&t	        |       d       |j-                          t        d| j                  j                   d+       y c c}w # t&        $ r8}t        d| j                  j                   d#t)        |              Y d }~d }~ww xY w# t&        $ r}t        d| j                  j                   d't)        |              |j+                          	 t"        j%                  dd|j                  d      d|j                  dd      dd(d)d       nD# t&        $ r8}t        d| j                  j                   d*t)        |              Y d }~nd }~ww xY wY d }~Nd }~ww xY w# |j-                          t        d| j                  j                   d+       w xY w),N[z] Processing batch of  reviews...commentstore_idr   sentiment_result	] Review r    Sentiment: 	sentiment
] Topics: relevant_topics_sentiment	branch_idsnapshot_iddatasource_iddatasource_sourcer,   r3   r4   r5   r6   reviewstotal_recordsbatch_indexd   import-updateprogress-update
processing	completedz
Analysing  of  reviews	Completedr5   	processedtotalpercentstatus
status_msg
event_namedata] Pusher notification sent:  - %] Pusher error: ] Batch stored successfully.   ] ❌ Mismatch:  results vs z!] Error during batch processing: failedBatch failed(] Pusher error in failure notification: ] DB session closed.)r   printrequestidlenr   	enumerate
isinstancetuplelistkeysr   getmaxmin
batch_sizeroundpusher_clienttrigger	Exceptionstrrollbackcloseselfr8   metadatadbreviewresultsiresultbatch_payloadr:   rD   pctchannel_namee
pusher_errs                  6/var/www/html/hubwallet-dev/src/utils/celery_worker.pyanalyze_review_batchry   @   s   	BW9$,,//""8WkRS)-456VI5Z 

 w<3w<'&w/ i	6fe,#AYF17
-.$,,//*)AaC5VKEXDYZ[$,,//**T&A\:]:b:b:d5e4fghi %Z0%k2'6!)/!:%-.A%B"M "m,E<<0"%hll=!&Da"HK #kAo%Cc'l%RU]^mUn oIXo-F!F# MNCc3-C&''I!.%% 0)1/)B%.!)/!:#&25),jmpsjs
9+T(?B[A\\d&e  zE &  $,,//**F|nTWX[W\\]^_ Adlloo&&BCD Adlloo&&6s7|nLQTU\Q]P^^fgh4 	
$,,//""678g 6d  E$,,//**:3q6(CDDE  b$,,//""CCF8LM
	b!!,%-\\/%B!"%\\/1= &"0 "   	bAdlloo&&NsS]N_`aa	b%b0 	
$,,//""678s   8K0 J'C$K0 5CJ, AK0 'K0 ,	K-5.K(#K0 (K--K0 0	O9>O8>M76O7	N8 .N3.O3N88O;O	 OO	 	5O>c                 |   t               }	 t        d| j                  j                   d       t	        || j                  j                  |d   |d   |d   |d          |j                          t        d| j                  j                   d       |d   |d   |d   |d   |d   t        |d	         d
}|d	   }t        |      t        z   dz
  t        z  }t        d| j                  j                   dt        |       d| d       t        t        dt        |      t                    D ]  \  }}|||t        z    }t        d| j                  j                   d|dz    d| d       i |d|dz   i}	t        ||	       t        d |D              }
|
dk  rdnt        dt        |
dz              }t        d| j                  j                   d| d       t        j                  |        t!        | j                  j                  d       |j                          t        d| j                  j                   d       t#        |d          |j1                          t        d| j                  j                   d$       y # t$        $ r}t        d| j                  j                   dt'        |              t!        | j                  j                  d       |j)                          	 t*        j-                  dd|j/                  d      dt        |j/                  d	g             ddd d!"       nD# t$        $ r8}t        d| j                  j                   d#t'        |              Y d }~nd }~ww xY wY d }~.d }~ww xY w# |j1                          t        d| j                  j                   d$       w xY w)%Nr)   %] Creating parent task entry in DB...r,   r3   r4   r5   z] Parent task entry created.source_typer8   r,   r3   r4   r5   r6   r9   r   z] Sequentially processing z reviews in  batches...r   ] Processing batch r@   ...r:   c              3   8   K   | ]  }t        |d            ywr+   NrZ   .0rs     rx   	<genexpr>z*process_csv_review_data.<locals>.<genexpr>        !C#a	l"3!C            ] Sleeping z+s before next batch to avoid rate limits...r?   z6] All batches processed. Generating recommendations...z] Error in parent task: rS   r<   r=   Processing failedrC   rI   /] Pusher error in parent failure notification:  ] Parent task DB session closed.)r   rW   rX   rY   r   commitrZ   rc   r[   rangery   ra   rb   rd   timesleepr   r
   rg   rh   ri   re   rf   r`   rj   )rl   payloadrn   rm   r8   total_batchesr:   rq   batchbatch_metadatamax_comment_lendelayrv   rw   s                 rx   process_csv_review_datar      sl   	BJE$,,//""GHIJK M"O$	
 			$,,//"">?@  
+ -"=1$_5!(!7 !34
 )$W
2Q6:E$,,//""<S\N,WdVeepqr'aWz(JK 	NKAa
l+EAdlloo&&9+/9J$}o]`ab HG-QGN !7 "!CU!CCO(C/ASU?UWCW=X5YEAdlloo&k%8cdeJJu	" 	4<<??K8
		$,,//""XYZ%gk&:;6 	
$,,//""BCD5  i$,,//"":3q6(CD4<<??H5
	i!!,%,[[%A!" Y!;< &"5 "   	iAdlloo&&UVYZdVeUfghh	i'i2 	
$,,//""BCDsV   IJ 	NAM>,AL43M>4	M5=.M0+M>0M55M>8N >NN 5N;c                    t               }	 t        d| j                  j                   dt	        |       d       t        |D cg c]  }|d   	 c}|d         }t	        |      t	        |      k7  rt        d| j                  j                   dt	        |       dt	        |       d       t        |      D ]B  \  }}t        d| j                  j                   d	|d
z    d|j                  d      d d         D t        d      t        |      D ]  \  }}t        |t              r|d   }|||   d<   t        d| j                  j                   d	|d
z    d|d           t        d| j                  j                   dt        |d   j                                        |d   |d   |d   |d   |d   |d}t        ||       	 |j                  d      rbt        |j                  dd
      d
      }	t        |	d
z
  t         z  t	        |      z   |d         }
t#        |
|d   z  dz        }t        |d      }ndx}
}d}t$        j'                  |d|d   |
|d   ||dk  rdnd|dk  rd |
 d!|d    dnd"d#$       t        d| j                  j                   d%| d&| d'       t        d| j                  j                   d)       |j/                          t        d| j                  j                   d.       y c c}w # t(        $ r8}t        d| j                  j                   d(t+        |              Y d }~d }~ww xY w# t(        $ r}t        d| j                  j                   d*t+        |              |j-                          	 t$        j'                  dd|j                  d      d|j                  dd      dd+d,d#$       nD# t(        $ r8}t        d| j                  j                   d-t+        |              Y d }~nd }~ww xY wY d }~Nd }~ww xY w# |j/                          t        d| j                  j                   d.       w xY w)/Nr)   z"] Processing Bright Data batch of r*   r+   r,   rQ   rR   rA   r.   r   z: 2   z4Mismatch in number of reviews and sentiment results.r   r-   r/   r0   r1   r2   r3   r4   r5   r6   r7   r9   r:   r;   r<   r=   r>   r?   z
Analyzing r@   rB   rC   rI   rL   rM   rN   rO   rP   z-] Error during Bright Data batch processing: rS   rT   rU   rV   )r   rW   rX   rY   rZ   r   r[   r`   
ValueErrorr\   r]   r^   r_   r   ra   rb   rc   rd   re   rf   rg   rh   ri   rj   rk   s                  rx   analyze_bright_review_batchr      sc   	BV9$,,//""DS\NR]^_)-456VI5Z 

 w<3w<'Adlloo&&6s7|nLQTU\Q]P^^fgh&w/ Y	6$,,//*)AaC56::i;PQTRT;U:VWXYSTT"7+ 	eIAv&%(-3GAJ)*Adlloo&iAwl6+CVBWXYAdlloo&jf=X6Y6^6^6`1a0bcd	e !,!+.#M2%o6!)*=!>
 	b-(	A||O,!(,,}a"@!D+/Z!?#g,!NQYZiQjk	Y/)BBcIJ#sm"##	C*L!!,%-o%>!*%o6".1Cil[ehknenJykh>W=XX`"at "  Adlloo&&B<.PSTWSXXYZ[ 	$,,//"">?@4 	
$,,//""678e 6h  	AAdlloo&&6s1vh?@@	A
  b$,,//""OPSTUPVxXY
	b!!,%-\\/%B!"%\\/1= &"0 "   	bAdlloo&&NsS]N_`aa	b%b0 	
$,,//""678s   8M	 L E9M	 
CL )#M	  M	 	M.M<M	 MM	 		P>P>OP	P.PPPPP" PP" "5Qc                    t               }i }	 t        d| j                  j                   d       t	        |      }|d   }|d   }|d   }|d   }|d   }t        d| j                  j                   d       t        || j                  j                  ||||       |j                          t        d| j                  j                   d	       |d
   }	t        |	      }
|
t        z   dz
  t        z  }t        d| j                  j                   d|
 d| d       ||||||
d}t        t        d|
t                    D ]  \  }}|	||t        z    }t        d| j                  j                   d|dz    d| d       i |d|dz   i}t        ||       t        d |D              }|dk  rdnt        dt        |dz              }t        d| j                  j                   d| d       t        j                   |        t#        | j                  j                  d       |j                          t        d| j                  j                   d       t%        |       |j3                          t        d| j                  j                   d%       y # t&        $ r}t        d| j                  j                   dt)        |              t#        | j                  j                  d       |j+                          	 t,        j/                  dd |j1                  d      dt        |j1                  d
g             ddd!d"#       nD# t&        $ r8}t        d| j                  j                   d$t)        |              Y d }~nd }~ww xY wY d }~.d }~ww xY w# |j3                          t        d| j                  j                   d%       w xY w)&Nr)   z+] Starting Bright Data review processing...r,   r3   r4   r5   r6   r{   z] Task entry created.r8   r   z] Processing z Bright Data reviews in r~   r}   r   r   r@   r   r:   c              3   8   K   | ]  }t        |d            ywr   r   r   s     rx   r   z-process_bright_review_data.<locals>.<genexpr>z  r   r   r   r   r   r   zs before next batch...r?   zB] All Bright Data batches processed. Generating recommendations...z$] Error in Bright Data parent task: rS   r<   r=   r   rC   rI   r   r   )r   rW   rX   rY   r	   r   r   rZ   rc   r[   r   r   ra   rb   rd   r   r   r   r
   rg   rh   ri   re   rf   r`   rj   )rl   bright_datarn   r   r,   r3   r4   r5   r6   r8   r9   r   base_metadatar:   rq   r   r   r   r   rv   rw   s                        rx   process_bright_review_datar   N  s   	BGME$,,//""MNO )5:&K(	m,0#$78$,,//""GHI"dllooxKQ^_
		$,,//""789)$G&3a7JF$,,//"->VWdVeepqr !"&*!2*
 (a
(KL 	NKAa*n-EAdlloo&&9+/9J$}o]`abLL}k!mLN'~> "!CU!CCO(C/ASU?UWCW=X5YEAdlloo&k%8NOPJJu	 	4<<??K8
		$,,//""def%i06 	
$,,//""BCD5  i$,,//""Fs1vhOP4<<??H5
	i!!,%,[[%A!" Y!;< &"5 "   	iAdlloo&&UVYZdVeUfghh	i'i2 	
$,,//""BCDsV   IJ 	NAN;AMN	N.M?:N?NNN NN 5O
)-celeryr   r   sqlalchemy.ormr   uuidsrc.utils.dbr   src.utils.tasksr   r   r	   r
   src.apps.feedback.servicesr   src.core.sentiment_analysisr   r   r   pushersrc.utils.settingsr   osdotenvr   r   r   r   r   rc   Pusherre   
REDIS_HOST
REDIS_PORTREDIS_DB	redis_url
celery_appconfupdateautodiscover_taskstaskry   r   r   r        rx   <module>r      s   " "  % x x 4 R   ' 	   
ll	


  
  x**+1X-@-@,A8CTCTBUV	%&
   8  #        dY9 Y9| dLE LE` dX9 X9z dPE PEr   