0

I have a dag task called check_file that is responsible to check the file in remote location.

check_file = PythonOperator(
    task_id='check_file',
    python_callable=check_file,
    on_failure_callback=error_handler,
    op_kwargs ={ 'config_file': configfile,
                    },
    )
def check_file(**kwargs):
    
    kwargs.clear()
    sharepointconfig = kwargs.get('config_file')
    kwargs['poke_interval']= kwargs.get('poke_interval', 60)
    kwargs['timeout'] = kwargs.get('timeout', 120)
    kwargs['retries'] = kwargs.get('retries', 0)
    kwargs['soft_fail'] = kwargs.get('soft_fail', False)
    kwargs['mode'] = kwargs.get('mode', 'reschedule')
    logging.info(f'parameters passed for sensor are {kwargs}')
    with CustomFileSensor(sharepointconfig, **kwargs) as mysensor:
        ret_value = mysensor.poke()
class CustomFileSensor(BaseSensorOperator):
    def __init__(self, sharepointconfigfile,**kwargs): 
        self.sharepointconfigfile = sharepointconfigfile
        kwargs['poke_interval']= kwargs.get('poke_interval', 60)
        kwargs['timeout'] = kwargs.get('timeout', 120)
        kwargs['retries'] = kwargs.get('retries', 0)
        kwargs['soft_fail'] = kwargs.get('soft_fail', False)
        kwargs['mode'] = kwargs.get('mode', 'reschedule')
        logging.info(f'parameters passed for sensor are {kwargs}')
        super(BaseSensorOperator, self).__init__(**kwargs)

    def poke(self, context):
        flag = self.check_flag(self.sharepointconfigfile)
        if flag == 'sftp':
            response = self.check_sftp_files(self.sharepointconfigfile)
            if response:
                return True
        elif flag == 'sp':
            response = self.check_sp_files(self.sharepointconfigfile)
            if response:
                return True

When I do this I am getting error: File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 90, in wrapper raise AirflowException(msg) airflow.exceptions.AirflowException: Argument ['python_callable'] is required

Is there anything wrong to call a sensor operator inside a python_callable of python_operator?

I was initially calling the task like this:

check_file = CustomFileSensor(
        task_id='check_file',
        sharepointconfigfile = sharepointconfigfile,
    )

This was working perfectly fine but the problem was that, the sensor class was getting initiated in other tasks as well which was quite problematic when the task failed and it threw out error as if there was a problem with sensor (like file not found.) Like for example, censor is getting initiated in prepare_files task which is downstream of the check_file.

I have tried inheriting the PythonOperator in CustomSensor class class CustomFileSensor(BaseSensorOperator,PythonOperator):

but it keeps asking for the python callable, eve n after passing the python callable from op_kwargs, it doesn't work.

0