Setting up a cron job in Google App Engine to periodically load data into BigQuery

It is often important to accept incoming data at regular interval and then update the data warehouse with these new data. A cron job is perfect for this kind of task. I have recently worked to achieve such a task using Google cloud platform and did not want to spin up a single compute engine instance just for this purpose. That would be a complete waste of resource. The other viable alternative is to use a Google App Engine instance and configure a cron job. That way I am not consuming a lot of resource and is paying for only what I use.

This particular task assumed that incremental loading data will be made avaialble in a cloud storage bucket as a csv file and the data will be inserted into the data warehousing platform at a fixed interval. This post describes the step-by-step process for achieving this and is easy to follow even if you are just a beginner on cloud platform in general.

Before you begin, create a new project tin google cloud console, a google cloud storage bucket and a bigquery table with required schema according to your dataset. You may also need to install Google Cloud SDK and configure it. The following instructions are tested on a Mac Sierra notebook.

To create a cron job on Google App Engine instance, follow these steps:

  1. Create a new directory by running mkdir ~/Desktop/newCronJob

  2. Move into this directory by typing cd ~/Desktop/newCronJob

  3. Create a new virtual environment by running virtualenv gcp-cron

  4. Activate this environemnt by running source gcp-cron/bin/activate

  5. Create a new directory to install required library for upload to app engine. This is a required step since google does not provide google-cloud library in runtime and we need to ‘vendor’ it during the run time, just like any other pure-python third party library. Let’s call this directory lib

  6. Create a new requirements.txt file. Put google-cloud==0.19 in the file. We specifically need this version since the new version did not work with from google.cloud import bigquery which is required to upload data from cloud storage to bigquery table. At this point, you may also need to set-up the environment by running gcloud init. See https://cloud.google.com/ml-engine/docs/quickstarts/command-line for help.

  7. Install the requirements by running pip install -r requirements.txt -t lib. This lib will be uploaded to GAE and sourced into python path in runtime to provide the cloud library.

  8. For local testing, we also need these libraries to be available in virtual environment. Install them by running pip install -r requirements.txt without specifying the target location. (I am little confused here, is there any better way to include these libraries in dev_appserver? I did not succeed by simply appending the path to lib, but then I did not try too hard).

  9. Create app.yaml and put the following in the file:

runtime: python27
api_version: 1
threadsafe: false

handlers:
- url: /.*
  script: main.app
  1. Create a cron.yaml and put the following:
cron:
- description: cron to BQ
  url: /cronjobs
  schedule: every 15 minutes from 10:20 to 11:20
  1. Create appengine_config.py and put the following in it:
# appengine_config.py, import the libraries included in ‘lib’ #directory
from google.appengine.ext import vendor

# Add any libraries install in the "lib" folder.
vendor.add('lib')

import os, sys

# This allows to avoid error on local machine #https://stackoverflow.com/questions/41783864/importerror-no-module-name#d-ctypes-google-app-engine-with-bokeh-plot

on_appengine = os.environ.get('SERVER_SOFTWARE','').startswith('Development')
if on_appengine and os.name == 'nt':
    sys.platform = "Not Windows"
  1. Finally, create a main.py and put the following:
import webapp2
import google
import os.path
import sys

# this is only for debug, just to see if we can properly import the google.cloud libraries, Remove once # done.
here = os.path.dirname(os.path.abspath(__file__))
here = os.path.relpath(google.__file__, here)
flag = 0

try:
    # if the google/ directory is in the directory /path/to/directory/
    #path_to_look_for_module = sys.path[0]+'/lib'

    #if not path_to_look_for_module in sys.path:
    #    sys.path.append(path_to_look_for_module)

    from os.path import dirname, realpath, sep, pardir
    import sys
    #sys.path.insert(0,dirname(__file__) + sep + "lib")
    sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))

    here = here + 'sys.path appended with ' + os.path.join(os.path.dirname(__file__), "lib")

    from google.cloud import bigquery
    flag = 1

    here = here + ' ...cloud import was successful'
except:
    if flag == 0:
        here = here + ' IMPORT NOT SUCCESSFUL'
    pass




#####################################################
### the main class to handle ‘catch all’ requests ###
#####################################################

class MainPage(webapp2.RequestHandler):
    def get(self):
        self.response.headers['Content-Type'] = 'text/plain'

       # these are debugging statements, 
       # outputs of these can be seen by going to app url
       # remove the print statements once debugging is complete

        self.response.write('Hello, World!'+ here + '\n')
        self.response.write(google.__path__)
        self.response.write('\n' + 'flag:'+ str(flag))
        self.response.write('\n')
        self.response.write(dirname(__file__) + sep + "lib")

##############################################################
# cron handler class, it should return a code between 200-299
# if successful. Otherwise, cron would treat that as fail. This 
# is why we need webapp2.RequestHandler to be included
##############################################################

class UploadBQ(webapp2.RequestHandler): # webapp2… important

    def get(self): # putting get with self is important. 
                   # Cron jobs are essentially equivalent to  hitting an URL 
        from  google.cloud import bigquery
        import uuid
        dataset_name = 'cloudStorage'                         # change this to your need
        table_name = 'test'                                   # change this to your need
        source = 'gs://gcp-test-bucket1/data.csv'             # change this to your need
        bigquery_client = bigquery.Client()
        dataset = bigquery_client.dataset(dataset_name)
        table = dataset.table(table_name)
        job_name = str(uuid.uuid4())

        job = bigquery_client.load_table_from_storage(job_name, table, source)

        job.begin()         # start the upload

        # in version 0.19, which is required for successful import of google-cloud,
        # job.result() method is not defined. Do it manually instead.
        # job.result(timeout = 120)  # Wait for job to complete - doesn’t work on 0.19 google-cloud
        
  
  import time
        retry_count = 120    # if your job needs more time, increase this
        while retry_count > 0 and job.state != 'DONE':
            retry_count -= 1
            time.sleep(10)
            job.reload()  # API call

#################################
# invoke the app and define urls
#################################

app = webapp2.WSGIApplication([
    ('/', MainPage),             # for everything except cron, use this class
    ('/cronjobs', UploadBQ)      # cron jobs, which hits ‘/cronjobs’ url, must use this class
], debug=True)
  1. Afterwards, run dev_appserver.py app.yaml to test your application locally. Go to http://localhost:8080 for the front-end and http://localhost:8000 for admin interface. Once your local version is working perfectly, we can deploy the app.

  2. Deploy both the app and cron job configuration by running gcloud app deploy app.yaml cron.yaml and follow the terminal prompts.

Once deployed, go to you app engine dashboard on Google cloud console and visit the task queue page. From here, you should be able to see the task you just set up in the cron tab. If you want to test your cronjob, just click on Run now button for the configured task. You are also able to go through the log for your cron task. Remember to increase timeout paramter if your task is running out of time.

comments powered by Disqus