2023-01-19

AWS Redshift Data Import in Python

Had done nothing for this blog for a long long long time, primarily because I changed my job from a technical role to a hybrid role.

Now just be quick, let's assume you have some large datasets need to be loaded into AWS Redshift. Obviously the best practice from AWS is load the file to S3 bucket, then COPY the file from S3 to Redshift.

Looks so simple huh? Now what if you are just an analytics guy, would your IT department provision a S3 bucket to you? would you be given COPY permission? Would you request be lodged as a Jira ticket and need to wait for a full sprint?

So let's try another way, basically maximum batch file size in AWS Redshift is about 16MB, so the workaround here is we can generate a batch insert statement, e.g. INSERT INTO TABLE(col1, col2...) VALUES (val1, val2...) , ... But to control the size we need a pagination function. When we have a pagination function, we want to have a configuration file for flexibility. And somehow a log function would be great if any issue happens. So below is the list of requirements:

  • source flat file
  • destination Redshift table name
  • configuration file to list source file delimiter, quotation, and source columns to destination column mapping
  • page size
for the configuration file, below is an example:

{
"delimiter":",",
"quotation":"\"",
"columns":[
{
"flat col1":"table col1",
"flat col2":"table col2"
}
]
}

And the python script is here:



#%%
from sqlalchemy import create_engine
import csv
import json
import logging
import uuid
from datetime import datetime
from urllib.parse import quote

username = "" #<= your redshift username
password = "" #<= your redshift password
rsServer = "" #<= your redshift instance address

engineRedshift = create_engine("postgresql://{}:{}@{}".format(quote(username), quote(password)), rsServer)

destSchema = "" #<= destination redshift schema
destTable = "" #<= destination redshift table
srcFile = "" #<= source file 
configFile = "" #<= configuration json file
pageSize = 10000 #<= page size

if (username == "") or (password == ""):
    raise Exception("Must provide username/password")
#%%
# provide a guid for each execution in the log file
uid = uuid.uuid4()

logFileName = "\\".join(srcFile.split("\\")[:-1]) + "\\" + "import_{}_{}.log".format(
    "".join(srcFile.split("\\")[-1:][0].split(".")[0:1]), 
    datetime.now().strftime("%Y%m%d"))

logging.basicConfig(
    filename=logFileName, 
    format="{} - %(asctime)s - %(message)s".format(uid), 
    level=logging.INFO
    )

#%%
logging.info("source file: {}".format(srcFile))
logging.info("destination table: {}.{}".format(destSchema, destTable))
logging.info("configuration file: {}".format(configFile))
logging.info("pagination setting: {}".format(pageSize))

#%%
# step 1: parse configuration to get delimiter, quotation, source columns and dest columns
with open(configFile, "r") as f:
    s = json.load(f)

delimiter = s["delimiter"]
quotation = s["quotation"]

lstSrc = []
lstDest = []

for key in s["columns"][0]:
    lstSrc.append(key)
    lstDest.append(s["columns"][0][key])

lstSrc = [c.lower() for c in lstSrc]
lstdesc = [c.lower() for c in lstDest]

#%%
logging.info("configuration delimiter: {}".format(delimiter))
logging.info("configuration quotation: {}".format("NA" if quotation == "" else quotation))
logging.info("configuration source columns: {}".format(lstSrc))
logging.info("configuration destination columns: {}".format(lstDest))

#%%
# step 2: all source columns should be in source csv file, read file header
# for extra columns in the source file, put column position in lstPop to drop
with open(srcFile, newline="") as flatFile:
    if quotation != "":
        reader = csv.reader(flatFile, delimiter=delimiter, quotechar=quotation)
    else: 
        reader = csv.reader(flatFile, delimiter=delimiter)
    header = next(reader)

header = [c.lower() for c in header]

lstPop =[]
for c in [c for c in header if c not in lstSrc]:
    lstPop.append(header.index(c))

# lstPop

logging.info("Columns to Drop from Source: {}".format(lstPop))

#%%
# step 3: read dest table schema
# if dest column in configuration file is not in redshift, raise error directly
# just like MSSQL import wizard, if validation raises error, simple stop the process
# note all destination columns need to be NULLABLE = YES, as we turn empty string in csv to NULL
sql = """select lower(column_name) as column_name, lower(is_nullable) as is_nullable
from information_schema.columns
where table_schema ilike '{}'
    and table_name ilike '{}'
order by ordinal_position""".format(destSchema, destTable)

lstR = [r for r in engineRedshift.execute(sql)]

lstCols = [r[0] for r in lstR]
for c in lstDest:
    if c not in lstCols:
        raise Exception("{} not in dest table".format(c))

lstNullable = [r[0] for r in lstR if r[1] == "no"]
if len(lstNullable) > 0:
    raise Exception("column(s) {} needs to be NULLable".format(",".join(lstNullable)))

logging.info("Verify destination columns: Passed")
#%%
# function to construct value list for insert statement
# fObj => file object
# lst => columns to drop
# min => start row
# max => end row
# d => delimiter
# q => quotation
def construct_string_sql_by_fObj(fObj, lst, min, max, d, q):
    value_list = ""
    
    try:
        if q != "":
            reader = csv.reader(fObj, delimiter=d, quotechar=q)
        else:
            reader = csv.reader(fObj, delimiter=d)
        page = [row for idx, row in enumerate(reader) if (idx >= min) & (idx < max) ]
        
        for row in page:
            if len(lst) > 0:
                for i in sorted(lst, reverse=True):
                    del row[i]

            row = [r.replace("'", "''") for r in row]
            s = "('" + "','".join(row) + "'),"
            value_list +=s
    except:
        return ""

    return value_list[:-1].replace("'',", "NULL,")


#%%
theFile = open(srcFile, "r").readlines()
conn = engineRedshift.connect()

for i in range(0, int(len(theFile)/pageSize) + 1):    
    min = i * pageSize + 1
    max = (i * pageSize + pageSize if i * pageSize + pageSize < len(theFile) else len(theFile)) + 1
    
    vlst = construct_string_sql_by_fObj(theFile, lstPop, min, max, delimiter, quotation)
    
    # note: file row count != count of rows to insert because newline could found
    # the pagination is calculated by file row count so if empty page found all remaining pages can be skipped
    if vlst == "":
        break

    stmt = "insert into {}.{}({}) Values".format(destSchema, destTable, ",".join(lstDest))
    stmt += vlst
    
    print("from {} to {}".format(min, max - 1))    
    # print(stmt)

    
    conn.execute(stmt)

    logging.info("Data import start: from {} to {}".format(min, max - 1))
    
conn.close()
print("import completed")

logging.info("File import completed\n\n\n\n")
        
# %%



Enjoy!




2019-12-19

Retrieve SharePoint Online List by Graph API

Last day in 2019, and last post in 2019.

Actually pulling SharePoint List from Power Query/Power BI is much faster and straightforward. But in some cases, you may still need to follow the traditional ETL way. Hope below script can help you if you need it.

Firstly we need to register the app in Azure AD (Check Here). After create your app, write down 3 ids: tenant id, client id, and the secret. Note the secret will only be exposed to you when you create the app, then it will be masked (though you can always create a new one.)


To allow your app to access the SharePoint list, you need to enable the "Site.Read.All" permission under the Application Permission category, as demonstrated below. Keep in mind though, the "Site.Read.All" permission needs AAD admin's consent.




That is all about the app setup. To query the list, we need to exchange the token by our tenant id, client id, and the secret:

 public class RespToken
        {
            public string token_type { get; set; }
            public int expires_in { get; set; }
            public int ext_expires_in { get; set; }
            public string access_token { get; set; }
        }

        public static string GetAccessToken(string tenentId, string clientId, string clientSecret)
        {
            string token = string.Empty;

            string uriToken = string.Format("https://login.microsoftonline.com/{0}/oauth2/v2.0/token", tenentId);
            string contentType = "application/x-www-form-urlencoded";
            string grantType = "client_credentials";
            string scope = "https://graph.microsoft.com/.default";

            string tmpBody = "grant_type={0}&client_id={1}&client_secret={2}&scope={3}";
            string body = string.Format(tmpBody, grantType, clientId, clientSecret, scope);
            byte[] contents = Encoding.UTF8.GetBytes(body);

            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(uriToken);
            request.ContentType = contentType;
            request.Method = "POST";

            request.ContentLength = contents.Length;
            using (Stream writeStream = request.GetRequestStream())
            {
                writeStream.Write(contents, 0, contents.Length);
            }

            using (HttpWebResponse response = (HttpWebResponse)request.GetResponse())
            {
                using (Stream readStream = response.GetResponseStream())
                {
                    string responseBody = string.Empty;
                    using (StreamReader sr = new StreamReader(readStream))
                    {
                        responseBody = sr.ReadToEnd();
                    }
                    JavaScriptSerializer Serializer = new JavaScriptSerializer();
                    var r = Serializer.Deserialize(responseBody);
                    token = r.access_token;
                }
            }
            
            return token;
        }




Then we follow the routine "search site => search list => get list item":

static void Main(string[] args)
        {

            string tenentId = string.Empty;
            string clientId = string.Empty;
            string clientSecret = string.Empty;
            string siteName = string.Empty;

            string siteId = string.Empty;
            string listId = string.Empty;


            Console.WriteLine("Tenent Id: ");
            tenentId = Console.ReadLine();

            Console.WriteLine("Client Id: ");
            clientId = Console.ReadLine();

            Console.WriteLine("Client Secret: ");
            clientSecret = Console.ReadLine();

            if (string.IsNullOrEmpty(tenentId) || string.IsNullOrEmpty(clientId) || string.IsNullOrEmpty(clientSecret))
            {
                Console.WriteLine("empty value found in mandatory parameters");
                return;
            }

            string token = GetAccessToken(tenentId, clientId, clientSecret);
            Console.WriteLine("Token acquired, query site");
            

            Console.WriteLine("Input site name for search: ");
            siteName = Console.ReadLine();

            string uriSite = $"https://graph.microsoft.com/v1.0/sites?search={siteName}";

            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(uriSite);
            request.Method = "GET";
            request.Headers.Add(HttpRequestHeader.Authorization, token);

            using (HttpWebResponse response = (HttpWebResponse)request.GetResponse())
            {
                using (Stream s = response.GetResponseStream())
                {
                    using (StreamReader r = new StreamReader(s))
                    {
                        string contents = r.ReadToEnd();
                        var j = JObject.Parse(contents);
                        siteId = j["value"]
                            .FirstOrDefault(x => x["name"].ToString().ToLower() == "msgraph")["id"]
                            .ToString();

                    }
                }
            }

            if (string.IsNullOrEmpty(siteId))
            {
                Console.WriteLine("error happens, cannot obtain site id");
                return;
            }


            Console.WriteLine($"site id acquired, {siteId}");
            string uriList = $"https://graph.microsoft.com/v1.0/sites/{siteId}/lists";
            request = (HttpWebRequest)WebRequest.Create(uriList);
            request.Method = "GET";
            request.Headers.Add(HttpRequestHeader.Authorization, token);

            using (HttpWebResponse response = (HttpWebResponse)request.GetResponse())
            {
                using (Stream s = response.GetResponseStream())
                {
                    using (StreamReader sr = new StreamReader(s))
                    {
                        string content = sr.ReadToEnd();
                        var jObject = JObject.Parse(content);

                        listId = jObject["value"]
                            .FirstOrDefault(x => x["name"].ToString().ToLower() == "events")["id"]
                            .ToString();
                    }
                }
            }

            if (string.IsNullOrEmpty(listId))
            {
                Console.WriteLine("error occurred, cannot obtains list id");
                return;
            }

            Console.WriteLine($"list id: {listId}");

            Console.ReadLine();

        } 



2019-12-16

Power Query for Twitter API

I am lazy, so let's make it quick. Looping twitter could be a bit trouble so I put power query script (for searching a specific account by timeline API) below. Hope it helps

1. A function to get token

let
    fnGetToken = () =>
        let 
            key = "Basic " & Binary.ToText(Text.ToBinary("{you should know what it is}"),0),
            url = "https://api.twitter.com/oauth2/token",
            getJson = Web.Contents(url,
                [
                    Headers = [#"Authorization"=key,#"Content-Type"="application/x-www-form-urlencoded;charset=UTF-8"],
                    Content = Text.ToBinary("grant_type=client_credentials") 
                ]
            ),
            j = Json.Document(getJson),
            token = j[access_token],
            tokenHeader = "bearer " & token
        in tokenHeader
in
    fnGetToken


2. Another function to get the max twitter id regarding the topic

let
 fnGetMaxId = (token) =>
 let 
  GetJsonQuery = Web.Contents("https://api.twitter.com/1.1/statuses/user_timeline.json?screen_name={this is the account}&count=1",
   [
    Headers = [#"Authorization"=token]
   ]
  ),
  FormatAsJsonQuery = Json.Document(GetJsonQuery),
  t = Table.FromList(FormatAsJsonQuery, Splitter.SplitByNothing(), null, null, ExtraValues.Error),
  maxId = Table.ExpandRecordColumn(t, "Column1", {"id_str"}, {"maxid"})
 in 
  maxId
in
 fnGetMaxId


3. the 3rd function to get result by token and endpoint. (technically you don't need it, just to make the main function easy to read)

let
 fnGetList = (token, endpoint) =>
  let 
   req = Web.Contents(endpoint,
    [
     Headers = [#"Authorization"=token]
    ]
   ),
   json = Json.Document(req)
  in 
   json
in
 fnGetList


4. finally the main query (note suggest to have a initial Id to limit the size of the result)

let
 fn = (initId) =>
  let
   token = fnGetTokenHeader(),
   I = if initId = "" then fnGetMaxId(token){0}[maxid] else initId,
   endPoint = "https://api.twitter.com/1.1/statuses/user_timeline.json?screen_name={this is the account}&max_id=" & I & "&count=200",
   records = fnGetResults(token, endPoint),
   tbl = Table.FromList(records, Splitter.SplitByNothing(), null, null, ExtraValues.Error),
   tblMaxIds = Table.ExpandRecordColumn(tbl, "Column1", {"id_str"}, {"maxid"}),
   minId = try List.Min(tblMaxIds[maxid]) otherwise null,
   recordSet = [records = records, nextId = minId]
  in
   recordSet,
 resultSet = List.Generate(
                    () => fn(""),
                    each Number.FromText(_[nextId]) >= 1000000000000000000, //an max id you want to put to limit result size, you should get a value by calling getMaxId function, otherwise it could return too many lists
     // each _[nextId] <> null,
                    each fn(_[nextId]),
                    each [nextId = _[nextId], records = _[records]]
                    )
in
    resultSet