Scheduling Triggers on ADF Pipelines

Recently we were facing a situation, where our data engineering team wanted a way in which they can create/update/delete the triggers on pipelines or run them on demand. Due to multiple challenges around ad-hoc nature of requests, frequently changing/updating scheduled slots meant we needed a way in which developers could easily manipulate these, without having to go through multitudes of approvals every time. We decided to create a few Azure functions, to help with that.

The basic code has been posted on github repo, so I won't dive into the functions bit over here as its pretty much straightforward, but lets look into the ADF SDK configuration.

Creating DataFactory Client

private async Task<DataFactoryManagementClient> 
     GetDataFactoryManagementClient()
{
    AuthenticationContext context = new AuthenticationContext(
        Environment.GetEnvironmentVariable("Azure_ContextUrl") +
        Environment.GetEnvironmentVariable("Azure_TenantId"));
    ClientCredential cc = new ClientCredential(
        Environment.GetEnvironmentVariable("ADF_ApplicationId"), 
        Environment.GetEnvironmentVariable("ADF_ApplicationSecret"));
    AuthenticationResult result = await context.AcquireTokenAsync(
        Environment.GetEnvironmentVariable("Azure_ManagementUrl"), cc);
    ServiceClientCredentials cred = new TokenCredentials(
        result.AccessToken);
    DataFactoryManagementClient client = 
            new DataFactoryManagementClient(cred)
            {
                SubscriptionId = 
                Environment.GetEnvironmentVariable("Azure_SubscriptionId")
            };
    return client;
}

First thing we need is a DataFactory Client. The code is pretty much straightforward. I am currently using environment variables, however the same could be replaced with any other mechanism, say key vault. The prerequisite here is we need to register an application, with access to execute ADF pipelines.

Manual Trigger Runs

private async Task<string> RunAdHocPipeline(
    DataFactoryManagementClient client, ManualTriggerRequest inputRequest)
{
    AzureOperationResponse<CreateRunResponse> runResponse = 
        await client.Pipelines.CreateRunWithHttpMessagesAsync(
            inputRequest.ResourceGroup,
            inputRequest.DataFactoryName,
            inputRequest.PipelineName,
            parameters: inputRequest.PipelineParams);

    return runResponse.Body.RunId;
}

Once we get the authenticated client, we can access the pipelines object exposed and execute the run method on the same. In case of successful run has been triggered, returning response will contain a RunId, which we can use later to query the status of the run (useful in case of long running pipelines).

Scheduled Triggers

This is where things get interesting. It's still a lot of configuration rather than any complex code (thank heavens!). We need to first create a ScheduleTrigger object.

private ScheduleTrigger GetScheduledTrigger(
    ScheduledTriggerRequest request)
{
    ScheduleTrigger trigger = new ScheduleTrigger()
    {
        Pipelines = GetPipelineForScheduleTrigger(request.Pipelines),
        Recurrence = GetScheduledTriggerRecurrence(request)
    };
    return trigger;
}

It consists of two properties:

Pipelines

private List<TriggerPipelineReference> 
    GetPipelineForScheduleTrigger(List<Pipeline> pipelines)
{
    List<TriggerPipelineReference> pipelineReferences = 
        new List<TriggerPipelineReference>();
    foreach (Pipeline pipeline in pipelines)
    {
        TriggerPipelineReference reference =
            new TriggerPipelineReference()
            {
                PipelineReference = new PipelineReference(pipeline.Name)
            };
        if (pipeline.PipelineParams != null &&
            pipeline.PipelineParams.Count > 0)
        {
            reference.Parameters = pipeline.PipelineParams;
        }
        pipelineReferences.Add(reference);
    }
    return pipelineReferences;
}

Recurrence

The recurrence has a lot of variations. Lets look into the ScheduleTriggerRecurrence slowly. This object can be further divided into two:

Trigger properties

private ScheduleTriggerRecurrence GetScheduledTriggerRecurrence(
    ScheduledTriggerRequest request)
{
    ScheduleTriggerRecurrence recurrence = new ScheduleTriggerRecurrence()
    {
        TimeZone = "UTC"
    };
    recurrence.StartTime = request.StartDate.HasValue ? 
                request.StartDate.Value.ToUniversalTime() : 
                request.StartDate;
    recurrence.EndTime = request.EndDate.HasValue ? 
                request.EndDate.Value.ToUniversalTime() : 
                request.EndDate;
    recurrence.Frequency = request.GetFrequency();
    recurrence.Interval = request.FrequencyInterval;
    if (request.Schedule != null)
    {
        recurrence.Schedule = GetRecurrenceSchedule(request);
    }
    return recurrence;
}
  • Timezone : Keeping timezone to 'UTC', as with any date or time related transactions.
  • StartTime & EndTime : These determine, when do you want the trigger to start and end. While both are optional, StartTime is defaulted to current time, while end could be null, which represents forever ongoing trigger.
  • Frequency : Frequency is the time interval unit after which you wish to run the pipeline. It could be Mins, hour, day, week, month or year.
  • Interval : Interval determines the duration in number and is combined with frequency.

Trigger schedule properties

private RecurrenceSchedule GetRecurrenceSchedule(
    ScheduledTriggerRequest request)
{
    RecurrenceSchedule schedule = new RecurrenceSchedule();
    if (request.Schedule.Hours != null)
    {
        schedule.Hours = request.Schedule.Hours;
    }
    if (request.Schedule.Minutes != null)
    {
        schedule.Minutes = request.Schedule.Minutes;
    }
    if (request.Frequency.Equals(Constants.Schedule.Month, 
        StringComparison.InvariantCultureIgnoreCase))
    {
        if (request.Schedule.ScheduleRecurrences != null && 
            request.Schedule.ScheduleRecurrences.Count > 0)
        {
            schedule.MonthlyOccurrences = 
                GetMonthlyScheduleRecurrence(
                    request.Schedule.ScheduleRecurrences);
        }
        else
        {
            schedule.MonthDays = request.Schedule.MonthDays;
        }
    }
    else if (request.Frequency.Equals(Constants.Schedule.Week, 
        StringComparison.InvariantCultureIgnoreCase))
    {
        schedule.WeekDays = 
            GetScheduleWeekDays(request.Schedule.WeekDays);
    }
    return schedule;
}
  • Hours & Minutes : A pipeline can be made to run at various hours of the day. We can pass array of integers (0-23, for hours and 0-59 for minutes). Thing to note here, is in case multiple values are supplied the outcome will be a combination of all values. eg. if I mention hours as 1,2 and time as 15, 30, then the trigger will run four times once each at 1.15, 1.30, 2.15 and 2.30.

Monthly recurrence :

Monthly recurrences can be set in two ways:

  • Either by defining Month days, so a trigger runs on a particular date.
  • Or by defining an occurrence. eg. we wish to run it on First Sunday of month, regardless of the date.
    private List<RecurrenceScheduleOccurrence> GetMonthlyScheduleRecurrence(
      List<ScheduleRecurrence> occurrences)
    {
      List<RecurrenceScheduleOccurrence> scheduleOccurrences =
          new List<RecurrenceScheduleOccurrence>();
      foreach (ScheduleRecurrence occurrence in occurrences)
      {
          RecurrenceScheduleOccurrence scheduleOccurrence =
              new RecurrenceScheduleOccurrence();
          if (occurrence.Day.HasValue)
          {
              scheduleOccurrence.Day =
                  Enum.Parse<Microsoft.Azure.Management.DataFactory.Models.DayOfWeek>
                  (occurrence.Day.ToString());
              scheduleOccurrence.Occurrence = occurrence.Occurence;
          }
          scheduleOccurrences.Add(scheduleOccurrence);
      }
      return scheduleOccurrences;
    }
    
  • Day : 0-6 ranged, integer based representation of day, 0 being Sunday
  • Occurence : integer representation of week, in a sense of the month.

    Weekly recurrence :

    private List<DaysOfWeek?> GetScheduleWeekDays(List<int?> inputList)
    {
      List<DaysOfWeek?> weekdays = new List<DaysOfWeek?>();
      foreach (int? day in inputList)
      {
          if (day.HasValue)
          {
              DaysOfWeek daysOfWeek = Enum.Parse<DaysOfWeek>(day.ToString());
              weekdays.Add(daysOfWeek);
          }
      }
      return weekdays;
    }
    
    Similar to month, weekly recurrence can be set by specifying integer representing the week.

Now once we have a ScheduleTrigger object, we can simply assign it to the TriggerResource, and use the CreateOrUpdate method to create/update the trigger. To start execution of the trigger we can call the Start method.

TriggerResource triggerResource = new TriggerResource()
{
    Properties = scheduleTrigger
};
await client.Triggers.CreateOrUpdateAsync(
    request.ResourceGroup, request.DataFactoryName, 
    request.TriggerName, triggerResource);

await client.Triggers.StartAsync(request.ResourceGroup, 
    request.DataFactoryName, request.TriggerName);