gpextprotocal.c

gpextprotocal.c

#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h" 
#include "access/extprotocol.h"
#include "catalog/pg_proc.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/memutils.h" 

/* Our chosen URI format. We can change it however needed */
typedef struct DemoUri 
{ 
   char     *protocol;
   char     *path;
}  DemoUri; 
static DemoUri *ParseDemoUri(const char *uri_str);
static void FreeDemoUri(DemoUri* uri); 

/* Do the module magic dance */ 
PG_MODULE_MAGIC; 
PG_FUNCTION_INFO_V1(demoprot_export); 
PG_FUNCTION_INFO_V1(demoprot_import); 
PG_FUNCTION_INFO_V1(demoprot_validate_urls); 

Datum demoprot_export(PG_FUNCTION_ARGS); 
Datum demoprot_import(PG_FUNCTION_ARGS); 
Datum demoprot_validate_urls(PG_FUNCTION_ARGS); 
 
/* A user context that persists across calls. Can be 
declared in any other way */
typedef struct { 
  char    *url; 
  char    *filename; 
  FILE    *file; 
} extprotocol_t; 
/* 
* The read function - Import data into GPDB.
*/ 
Datum 
myprot_import(PG_FUNCTION_ARGS) 
{ 
  extprotocol_t   *myData; 
  char            *data; 
  int             datlen; 
  size_t          nread = 0; 
 
  /* Must be called via the external table format manager */ 
  if (!CALLED_AS_EXTPROTOCOL(fcinfo)) 
    elog(ERROR, "myprot_import: not called by external
       protocol manager"); 
 
  /* Get our internal description of the protocol */ 
  myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo); 
 
  if(EXTPROTOCOL_IS_LAST_CALL(fcinfo)) 
  { 
    /* we're done receiving data. close our connection */ 
    if(myData && myData->file) 
      if(fclose(myData->file)) 
        ereport(ERROR, 
          (errcode_for_file_access(), 
           errmsg("could not close file \"%s\": %m", 
               myData->filename))); 
     
    PG_RETURN_INT32(0); 
  }
  
  if (myData == NULL) 
  { 
    /* first call. do any desired init */ 

    const char    *p_name = "myprot"; 
    DemoUri       *parsed_url; 
    char          *url = EXTPROTOCOL_GET_URL(fcinfo); 
    myData        = palloc(sizeof(extprotocol_t)); 
    
    myData->url   = pstrdup(url); 
    parsed_url    = ParseDemoUri(myData->url); 
    myData->filename = pstrdup(parsed_url->path); 
    
    if(strcasecmp(parsed_url->protocol, p_name) != 0) 
      elog(ERROR, "internal error: myprot called with a
          different protocol (%s)", 
            parsed_url->protocol); 
            
    FreeDemoUri(parsed_url); 
    
    /* open the destination file (or connect to remote server in
       other cases) */ 
    myData->file = fopen(myData->filename, "r"); 
    
    if (myData->file == NULL) 
      ereport(ERROR, 
          (errcode_for_file_access(), 
           errmsg("myprot_import: could not open file \"%s\"
             for reading: %m", 
             myData->filename), 
           errOmitLocation(true))); 

    EXTPROTOCOL_SET_USER_CTX(fcinfo, myData); 
  }
  /* ========================================== 
   *          DO THE IMPORT 
   * ========================================== */ 
  data    = EXTPROTOCOL_GET_DATABUF(fcinfo); 
  datlen  = EXTPROTOCOL_GET_DATALEN(fcinfo); 
  
  /* read some bytes (with fread in this example, but normally
     in some other method over the network) */
  if(datlen > 0) 
  { 
    nread = fread(data, 1, datlen, myData->file); 
    if (ferror(myData->file)) 
      ereport(ERROR, 
        (errcode_for_file_access(), 
          errmsg("myprot_import: could not write to file
            \"%s\": %m", 
            myData->filename))); 
  }
  PG_RETURN_INT32((int)nread); 
}
/* 
 * Write function - Export data out of GPDB 
 */ 
Datum  
myprot_export(PG_FUNCTION_ARGS) 
{ 
  extprotocol_t  *myData; 
  char           *data; 
  int            datlen; 
  size_t         wrote = 0; 
  
  /* Must be called via the external table format manager */ 
  if (!CALLED_AS_EXTPROTOCOL(fcinfo)) 
    elog(ERROR, "myprot_export: not called by external
       protocol manager"); 
       
  /* Get our internal description of the protocol */ 
  myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo); 
  if(EXTPROTOCOL_IS_LAST_CALL(fcinfo)) 
  { 
    /* we're done sending data. close our connection */ 
    if(myData && myData->file) 
      if(fclose(myData->file)) 
        ereport(ERROR, 
            (errcode_for_file_access(), 
              errmsg("could not close file \"%s\": %m", 
                 myData->filename))); 
    
    PG_RETURN_INT32(0); 
  }
  if (myData == NULL) 
  { 
    /* first call. do any desired init */ 
    const char *p_name = "myprot"; 
    DemoUri    *parsed_url; 
    char       *url = EXTPROTOCOL_GET_URL(fcinfo); 
    
    myData           = palloc(sizeof(extprotocol_t)); 
    
    myData->url      = pstrdup(url); 
    parsed_url       = ParseDemoUri(myData->url); 
    myData->filename = pstrdup(parsed_url->path); 
    
    if(strcasecmp(parsed_url->protocol, p_name) != 0) 
      elog(ERROR, "internal error: myprot called with a 
         different protocol (%s)", 
         parsed_url->protocol); 
            
    FreeDemoUri(parsed_url); 
    
    /* open the destination file (or connect to remote server in
    other cases) */ 
    myData->file = fopen(myData->filename, "a"); 
    if (myData->file == NULL) 
      ereport(ERROR, 
        (errcode_for_file_access(), 
           errmsg("myprot_export: could not open file \"%s\"
             for writing: %m", 
             myData->filename), 
         errOmitLocation(true))); 
     
    EXTPROTOCOL_SET_USER_CTX(fcinfo, myData); 
  } 
  /* ======================================== 
   *      DO THE EXPORT 
   * ======================================== */ 
  data   = EXTPROTOCOL_GET_DATABUF(fcinfo); 
  datlen   = EXTPROTOCOL_GET_DATALEN(fcinfo); 
  
  if(datlen > 0) 
  { 
    wrote = fwrite(data, 1, datlen, myData->file); 
    
    if (ferror(myData->file)) 
      ereport(ERROR, 
        (errcode_for_file_access(), 
         errmsg("myprot_import: could not read from file
            \"%s\": %m", 
            myData->filename))); 
  } 
  PG_RETURN_INT32((int)wrote); 
} 
Datum  
myprot_validate_urls(PG_FUNCTION_ARGS) 
{ 
  List         *urls; 
  int          nurls; 
  int          i; 
  ValidatorDirection  direction; 
  
  /* Must be called via the external table format manager */ 
  if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo)) 
    elog(ERROR, "myprot_validate_urls: not called by external
       protocol manager");
       
  nurls       = EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo); 
  urls        = EXTPROTOCOL_VALIDATOR_GET_URL_LIST(fcinfo); 
  direction   = EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo); 
  /* 
   * Dumb example 1: search each url for a substring  
   * we don't want to be used in a url. in this example 
   * it's 'secured_directory'. 
   */ 
  for (i = 1 ; i <= nurls ; i++) 
  { 
    char *url = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, i); 
    
    if (strstr(url, "secured_directory") != 0) 
    { 
      ereport(ERROR, 
       (errcode(ERRCODE_PROTOCOL_VIOLATION), 
          errmsg("using 'secured_directory' in a url
            isn't allowed "))); 
    } 
  } 
  /* 
   * Dumb example 2: set a limit on the number of urls  
   * used. In this example we limit readable external 
   * tables that use our protocol to 2 urls max. 
   */ 
  if(direction == EXT_VALIDATE_READ && nurls > 2) 
  { 
    ereport(ERROR, 
      (errcode(ERRCODE_PROTOCOL_VIOLATION), 
        errmsg("more than 2 urls aren't allowed in this protocol "))); 
  }
  PG_RETURN_VOID(); 
}
/* --- utility functions --- */ 
static  
DemoUri *ParseDemoUri(const char *uri_str) 
{ 
  DemoUri *uri = (DemoUri *) palloc0(sizeof(DemoUri)); 
  int     protocol_len; 
  
   uri->path = NULL; 
   uri->protocol = NULL; 
  /* 
   * parse protocol 
   */ 
  char *post_protocol = strstr(uri_str, "://"); 
  
  if(!post_protocol) 
  { 
    ereport(ERROR, 
      (errcode(ERRCODE_SYNTAX_ERROR), 
       errmsg("invalid protocol URI \'%s\'", uri_str), 
       errOmitLocation(true))); 
  }
  
  protocol_len = post_protocol - uri_str; 
  uri->protocol = (char *)palloc0(protocol_len + 1); 
  strncpy(uri->protocol, uri_str, protocol_len); 
  
  /* make sure there is more to the uri string */ 
  if (strlen(uri_str) <= protocol_len) 
    ereport(ERROR, 
      (errcode(ERRCODE_SYNTAX_ERROR), 
       errmsg("invalid myprot URI \'%s\' : missing path",
         uri_str), 
      errOmitLocation(true))); 
      
  /* parse path */ 
  uri->path = pstrdup(uri_str + protocol_len + strlen("://"));
  
  return uri; 
}
static 
void FreeDemoUri(DemoUri *uri) 
{ 
  if (uri->path) 
    pfree(uri->path); 
  if (uri->protocol) 
    pfree(uri->protocol); 
   
  pfree(uri); 
}