読者です 読者をやめる 読者になる 読者になる

Pythonでもオレオレ文字コードを実装したい (3)

実装の流れ

自作エンコードPythonで扱うための実装の手順は以下の感じっぽいです。

7.2. codecs — codec レジストリと基底クラス — Python 3.3.6 ドキュメント

  1. エンコーダ・デコーダの実装
  2. エンコーダ・デコーダとcodecsモジュールに含まれているStreamWriteとStreamReaderの基底クラスを用いていい感じにStreamWriterとStreamReaderの実装
  3. 同じくエンコーダ・デコーダとcodecsモジュールに含まれているIncrementalEncoderとIncrementalDecoderの基底クラスを用いていい感じにIncrementalEncoderとIncrementalDecoderの実装(任意)
  4. 1〜3で作った物を使用してCodecInfoオブジェクトを構築
  5. 自作エンコーディング名からcodec.CodecInfoオブジェクトを検索して返す検索関数を実装
  6. 検索関数をcodecs.registerに登録
  7. たのしい

今回定義した文字符号化方式はステートレスな上にすべてがシングルバイトですから単純にバイトとUnicode文字をマップしてあげれば比較的簡単にエンコーダとデコーダを実装できそうです。

StreamReaderとSteamWriterはモジュールに含まれている基底クラスを自作したエンコーダ・デコーダで拡張してあげれば比較的簡単に実装できます。

そういえば、今回作成した符号化文字集合文字符号化方式の名前を決めていませんでした。 文字集合を『Jyuchラテンアルファベット・かな文字集合』、符号化方式を『Jyuch-Kana』という名前にしましょう。 符号化文字集合の名前はどうでもいいのですが、符号化方式の方はPythonコーデックレジストリに登録するときに名前で登録しなくてはならないので、何かしらは決めておかないといけません。

エンコーダとデコーダ

兎にも角にもまずはエンコーダとデコーダを実装しなければ話になりません。

エンコーダとデコーダはcodecsモジュールのCodec基底クラスのencode()decode()をオーバーライド(するか、同じシグネチャを持つ関数を定義)して実装します。

それぞれ詳しく見ていきましょう。ここがメインですから。

エンコードとデコード

今まで当たり前のようにエンコードとデコードという言葉を使っていましたが、意味を再確認してみましょう。

Pythonでは文字をUnicodeとして表します。 しかし、ファイルでは文字をバイト列として表現します。 つまり、Pythonでテキストファイルを扱うためにはUnicodeとして表現された文字とバイト列として表現された文字をそれぞれ変換するための何かが必要になります。

このUnicodeからバイト列へ変換を行うのがエンコーダ、逆にバイト列からUnicodeへ変換を行うのがデコーダです。

encoder(Unicode) -> バイト列
decoder(バイト列) -> Unicode

また、当たり前ですがバイト列をUnicodeにデコードした後、そのUnicodeをバイト列にエンコードしたら元のバイト列にならなくてはなりません。

decoder(encoder(Unicode)) -> 元のUnicode
encoder(decoder(バイト列)) -> 元のバイト列

エンコーダの実装

エンコーダはcodecs.Codecインターフェースの実装として定義します。

エンコーダは以下のシグネチャを持つ関数として定義されます。

encode(input[, errors])

オブジェクトinputエンコードし、(出力オブジェクト、消費したinputオブジェクトの長さ)を保持するタプルを返します。

errorsエンコード中にエラーが発生した場合のエンコーダの挙動を指定するために用いる文字列オブジェクトです。 標準がstrictですが、他にも標準ライブラリには幾つかの文字列が定義されています。

今回は正直遊びで作ってる実用性皆無の符号化方式に真面目にエラー処理を行う気がない時間の都合上strictのみの実装にし、他のを渡されても無視する事にします。

ということで、エンコーダをポンと実装します。

def encode(input, errors='strict'):
    buffer = []
    for i, it in zip(count(0), input):
        try:
            buffer.append(convert_unicode_to_hex(it))
        except ValueError:
            raise UnicodeError(
                "'jyuch-kana' codec can't encode characters in position {0}".format(i)
            )
    return bytes(buffer), len(input)


def convert_unicode_to_hex(unicode):
    if unicode == chr(0x00):
        return 0x00
    elif unicode == chr(0x0A):
        return 0x01
    elif unicode == chr(0x0D):
        return 0x02
    elif unicode == chr(0x20):
        return 0x10
    elif chr(0x41) <= unicode <= chr(0x5A):
        return ord(unicode) - 0x30
    elif chr(0x61) <= unicode <= chr(0x7A):
        return ord(unicode) - 0x36
    elif unicode == chr(0x3000):
        return 0x50
    elif chr(0x3041) <= unicode <= chr(0x3093):
        return ord(unicode) - 0x2FF0
    else:
        raise ValueError()

convert_unicode_to_hex関数でUnicode一文字を等価なバイナリ値に変換し、convert_unicode_to_hexを用いてencodeメソッドで実際に文字列からバイナリ列に変換を行います。

また、エンコードできない文字を受け取った場合はUnicodeErrorをスローします。 本当はUnicodeEncodeErrorをスローして詳細な情報を上位のメソッドに通知したほうがいいと思うのですが、UnicodeEncodeErrorをうまく生成出来ずに断念しました。 ググってもキャッチする方しか出てこず、スローする方の情報が見当たりません。 詳しいことを知っている方がいましたら教えてください。

また、お気づきの方もいると思いますがcodecs.Codecの実際のencodeメソッドencode(self, input, errors='strict')ですが、この関数にはselfがありません。が、大丈夫です。後でいい感じになんとかします。

デコーダ

同様にデコーダも実装します。

デコーダは以下のシグネチャを持つ関数として定義されます。

decode(input[, errors])

エンコーダと同様にオブジェクトinputをデコードし、(出力オブジェクト、消費したinputオブジェクトの長さ)を保持するタプルを返します。

def decode(input, errors='strict'):
    buffer = []
    for i, it in zip(count(0), input):
        try:
            buffer.append(convert_hex_to_unicode(it))
        except ValueError:
            raise UnicodeError(
                "'jyuch-kana' codec can't decode characters in position {0}".format(i)
            )
    return ''.join(buffer), len(input)


def convert_hex_to_unicode(hex):
    if hex == 0x00:
        return chr(0x00)
    elif hex == 0x01:
        return chr(0x0A)
    elif hex == 0x02:
        return chr(0x0D)
    elif hex == 0x10:
        return chr(0x20)
    elif 0x11 <= hex <= 0x2A:
        return chr(hex + 0x30)
    elif 0x2B <= hex <= 0x44:
        return chr(hex + 0x36)
    elif hex == 0x50:
        return chr(0x3000)
    elif 0x51 <= hex <= 0xA3:
        return chr(hex + 0x2FF0)
    else:
        raise ValueError()

プログラムの構造も基本的にはエンコーダと同じで、単一のバイナリを等価なUnicodeに変換する関数convert_hex_to_unicodeとそれを組み合わせてバイナリ列を等価なUnicode文字列に変換するdecode関数として定義します。

StreamReaderとStreamWriterの実装

先ほど実装したエンコーダ・デコーダを用いてStreamReaderStreamWriterのサブクラスを実装します。

これは、実際のコード見てもらったほうが簡単だと思います。

class StreamWriter(codecs.StreamWriter):
    encode = lambda self, input, errors: encode(input, errors)

class StreamReader(codecs.StreamReader):
    decode = lambda self, input, errors: decode(input, errors)

終わりです。 それぞれのencodedecodeを自作したエンコーダとデコーダで置き換えて終了です。 残りの処理についてはそれぞれの基底クラスがいい感じにやってくれます。

CodecInfoオブジェクトの生成

これまで実装したエンコーダ・デコーダ・StreamReader・StreamWriterを用いてJyuch-KanaのCodecInfoオブジェクトを構築します。 標準ライブラリではCodecInfoオブジェクトを構築して返す関数を定義するという形になっていたのでそれに従い実装します。

def getregentry():
    return codecs.CodecInfo(
        name='jyuch-kana',
        encode=lambda self, input, errors: encode(input, errors),
        decode=lambda self, input, errors: decode(input, errors),
        streamreader=StreamReader,
        streamwriter=StreamWriter,
    )

Pythonではコーデック名はすべて小文字で統一するみたいです。

検索関数の登録

ここまでくれば後はもうちょいです。

自作したコーデック名からCodecInfoオブジェクトを返す関数を実装し、それをcodecs.registerに登録すれば完了です。

def search_function(codec_name):
    if codec_name == 'jyuch-kana':
        return getregentry()
    else:
        return None

def registry():
    codecs.register(search_function)

registry()

ここでのキモは、検索関数が自分の知らないコーデック名を渡された時は下手に変なのを返すのではなく素直にNoneを返してあげることです。 そうドキュメントに書いてありました。はい。

コーデックの全体

ポイントポイントでプログラムを抜き出したので、正直わかりにくいと思います。 以下が今回実装したコーデックの全体です。

""" 'Jyuch-Kana' Codec

Written by jyuch (http://jyuch.hatenablog.com/)
"""

import codecs
from itertools import count


def encode(input, errors='strict'):
    buffer = []
    for i, it in zip(count(0), input):
        try:
            buffer.append(convert_unicode_to_hex(it))
        except ValueError:
            raise UnicodeError(
                "'jyuch-kana' codec can't encode characters in position {0}".format(i)
            )
    return bytes(buffer), len(input)


def convert_unicode_to_hex(unicode):
    if unicode == chr(0x00):
        return 0x00
    elif unicode == chr(0x0A):
        return 0x01
    elif unicode == chr(0x0D):
        return 0x02
    elif unicode == chr(0x20):
        return 0x10
    elif chr(0x41) <= unicode <= chr(0x5A):
        return ord(unicode) - 0x30
    elif chr(0x61) <= unicode <= chr(0x7A):
        return ord(unicode) - 0x36
    elif unicode == chr(0x3000):
        return 0x50
    elif chr(0x3041) <= unicode <= chr(0x3093):
        return ord(unicode) - 0x2FF0
    else:
        raise ValueError()


def decode(input, errors='strict'):
    buffer = []
    for i, it in zip(count(0), input):
        try:
            buffer.append(convert_hex_to_unicode(it))
        except ValueError:
            raise UnicodeError(
                "'jyuch-kana' codec can't decode characters in position {0}".format(i)
            )
    return ''.join(buffer), len(input)


def convert_hex_to_unicode(hex):
    if hex == 0x00:
        return chr(0x00)
    elif hex == 0x01:
        return chr(0x0A)
    elif hex == 0x02:
        return chr(0x0D)
    elif hex == 0x10:
        return chr(0x20)
    elif 0x11 <= hex <= 0x2A:
        return chr(hex + 0x30)
    elif 0x2B <= hex <= 0x44:
        return chr(hex + 0x36)
    elif hex == 0x50:
        return chr(0x3000)
    elif 0x51 <= hex <= 0xA3:
        return chr(hex + 0x2FF0)
    else:
        raise ValueError()


class StreamWriter(codecs.StreamWriter):
    encode = lambda self, input, errors: encode(input, errors)


class StreamReader(codecs.StreamReader):
    decode = lambda self, input, errors: decode(input, errors)


def getregentry():
    return codecs.CodecInfo(
        name='jyuch-kana',
        encode=lambda self, input, errors: encode(input, errors),
        decode=lambda self, input, errors: decode(input, errors),
        streamreader=StreamReader,
        streamwriter=StreamWriter,
    )


def search_function(codec_name):
    if codec_name == 'jyuch-kana':
        return getregentry()
    else:
        return None


def registry():
    codecs.register(search_function)


registry()

前回クッソ長い符号化文字集合表をダイレクトに載せといてあれですけど、このエントリも結構な長さになってしまったので実際の使い方については次の記事に書こうと思います。